1use crate::ipc::{self, IpcMessage};
11use bincode;
12use fnv::FnvHasher;
13use libc::{
14 self, cmsghdr, linger, CMSG_DATA, CMSG_LEN, CMSG_SPACE, MAP_FAILED, MAP_SHARED, PROT_READ,
15 PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET,
16};
17use libc::{c_char, c_int, c_void, getsockopt, SO_LINGER, S_IFMT, S_IFSOCK};
18use libc::{iovec, msghdr, off_t, recvmsg, sendmsg};
19use libc::{sa_family_t, setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t};
20use libc::{EAGAIN, EWOULDBLOCK};
21use mio::unix::SourceFd;
22use mio::{Events, Interest, Poll, Token};
23use std::cell::Cell;
24use std::cmp;
25use std::collections::HashMap;
26use std::convert::TryInto;
27use std::error::Error as StdError;
28use std::ffi::{c_uint, CString};
29use std::fmt::{self, Debug, Formatter};
30use std::hash::BuildHasherDefault;
31use std::io;
32use std::marker::PhantomData;
33use std::mem;
34use std::ops::{Deref, RangeFrom};
35use std::os::fd::RawFd;
36use std::ptr;
37use std::slice;
38use std::sync::atomic::{AtomicUsize, Ordering};
39use std::sync::{Arc, LazyLock};
40use std::thread;
41use std::time::{Duration, UNIX_EPOCH};
42use tempfile::{Builder, TempDir};
43
44const MAX_FDS_IN_CMSG: u32 = 64;
45
46const RESERVED_SIZE: usize = 32;
50
51#[cfg(any(target_os = "linux", target_os = "illumos"))]
52const SOCK_FLAGS: c_int = libc::SOCK_CLOEXEC;
53#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
54const SOCK_FLAGS: c_int = 0;
55
56#[cfg(any(target_os = "linux", target_os = "illumos"))]
57const RECVMSG_FLAGS: c_int = libc::MSG_CMSG_CLOEXEC;
58#[cfg(not(any(target_os = "linux", target_os = "illumos")))]
59const RECVMSG_FLAGS: c_int = 0;
60
61#[cfg(target_env = "gnu")]
62type IovLen = usize;
63#[cfg(target_env = "gnu")]
64type MsgControlLen = size_t;
65
66#[cfg(not(target_env = "gnu"))]
67type IovLen = i32;
68#[cfg(not(target_env = "gnu"))]
69type MsgControlLen = socklen_t;
70
71unsafe fn new_sockaddr_un(path: *const c_char) -> (sockaddr_un, usize) {
72 let mut sockaddr: sockaddr_un = mem::zeroed();
73 libc::strncpy(
74 sockaddr.sun_path.as_mut_ptr(),
75 path,
76 sockaddr.sun_path.len() - 1,
77 );
78 sockaddr.sun_family = libc::AF_UNIX as sa_family_t;
79 (sockaddr, mem::size_of::<sockaddr_un>())
80}
81
82static SYSTEM_SENDBUF_SIZE: LazyLock<usize> = LazyLock::new(|| {
83 let (tx, _) = channel().expect("Failed to obtain a socket for checking maximum send size");
84 tx.get_system_sendbuf_size()
85 .expect("Failed to obtain maximum send size for socket")
86});
87
88static PID: LazyLock<u32> = LazyLock::new(std::process::id);
90
91static SHM_COUNT: AtomicUsize = AtomicUsize::new(0);
93
94pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver), UnixError> {
95 let mut results = [0, 0];
96 unsafe {
97 if socketpair(
98 libc::AF_UNIX,
99 SOCK_SEQPACKET | SOCK_FLAGS,
100 0,
101 &mut results[0],
102 ) >= 0
103 {
104 Ok((
105 OsIpcSender::from_fd(results[0]),
106 OsIpcReceiver::from_fd(results[1]),
107 ))
108 } else {
109 Err(UnixError::last())
110 }
111 }
112}
113
114#[derive(Clone, Copy)]
115struct PollEntry {
116 pub id: u64,
117 pub fd: RawFd,
118}
119
120#[derive(PartialEq, Debug)]
121pub struct OsIpcReceiver {
122 fd: Cell<c_int>,
123}
124
125impl Drop for OsIpcReceiver {
126 fn drop(&mut self) {
127 unsafe {
128 let fd = self.fd.get();
129 if fd >= 0 {
130 let result = libc::close(fd);
131 assert!(
132 thread::panicking() || result == 0,
133 "closed receiver (fd: {}): {}",
134 fd,
135 UnixError::last(),
136 );
137 }
138 }
139 }
140}
141
142impl OsIpcReceiver {
143 fn from_fd(fd: c_int) -> OsIpcReceiver {
144 OsIpcReceiver { fd: Cell::new(fd) }
145 }
146
147 fn consume_fd(&self) -> c_int {
148 let fd = self.fd.get();
149 self.fd.set(-1);
150 fd
151 }
152
153 pub fn consume(&self) -> OsIpcReceiver {
154 OsIpcReceiver::from_fd(self.consume_fd())
155 }
156
157 #[allow(clippy::type_complexity)]
158 pub fn recv(&self) -> Result<IpcMessage, UnixError> {
159 recv(self.fd.get(), BlockingMode::Blocking)
160 }
161
162 #[allow(clippy::type_complexity)]
163 pub fn try_recv(&self) -> Result<IpcMessage, UnixError> {
164 recv(self.fd.get(), BlockingMode::Nonblocking)
165 }
166
167 #[allow(clippy::type_complexity)]
168 pub fn try_recv_timeout(&self, duration: Duration) -> Result<IpcMessage, UnixError> {
169 recv(self.fd.get(), BlockingMode::Timeout(duration))
170 }
171}
172
173#[derive(PartialEq, Debug)]
174struct SharedFileDescriptor(c_int);
175
176impl Drop for SharedFileDescriptor {
177 fn drop(&mut self) {
178 unsafe {
179 let result = libc::close(self.0);
180 assert!(thread::panicking() || result == 0);
181 }
182 }
183}
184
185#[derive(PartialEq, Debug, Clone)]
186pub struct OsIpcSender {
187 fd: Arc<SharedFileDescriptor>,
188 nosync_marker: PhantomData<Cell<()>>,
193}
194
195impl OsIpcSender {
196 fn from_fd(fd: c_int) -> OsIpcSender {
197 OsIpcSender {
198 fd: Arc::new(SharedFileDescriptor(fd)),
199 nosync_marker: PhantomData,
200 }
201 }
202
203 fn get_system_sendbuf_size(&self) -> Result<usize, UnixError> {
208 unsafe {
209 let mut socket_sendbuf_size: c_int = 0;
210 let mut socket_sendbuf_size_len = mem::size_of::<c_int>() as socklen_t;
211 if getsockopt(
212 self.fd.0,
213 libc::SOL_SOCKET,
214 libc::SO_SNDBUF,
215 &mut socket_sendbuf_size as *mut _ as *mut c_void,
216 &mut socket_sendbuf_size_len as *mut socklen_t,
217 ) < 0
218 {
219 return Err(UnixError::last());
220 }
221 Ok(socket_sendbuf_size.try_into().unwrap())
222 }
223 }
224
225 fn fragment_size(sendbuf_size: usize) -> usize {
233 sendbuf_size - RESERVED_SIZE
234 }
235
236 fn first_fragment_size(sendbuf_size: usize) -> usize {
240 (Self::fragment_size(sendbuf_size) - mem::size_of::<usize>()) & (!8usize + 1)
241 }
243
244 pub fn get_max_fragment_size() -> usize {
254 Self::first_fragment_size(*SYSTEM_SENDBUF_SIZE)
255 }
256
257 pub fn send(
258 &self,
259 data: &[u8],
260 channels: Vec<OsIpcChannel>,
261 shared_memory_regions: Vec<OsIpcSharedMemory>,
262 ) -> Result<(), UnixError> {
263 let mut fds = Vec::new();
264 for channel in channels.iter() {
265 fds.push(channel.fd());
266 }
267 for shared_memory_region in shared_memory_regions.iter() {
268 fds.push(shared_memory_region.store.fd());
269 }
270
271 fn send_first_fragment(
278 sender_fd: c_int,
279 fds: &[c_int],
280 data_buffer: &[u8],
281 len: usize,
282 ) -> Result<(), UnixError> {
283 let result = unsafe {
284 let cmsg_length = mem::size_of_val(fds) as c_uint;
285 let (cmsg_buffer, cmsg_space) = if cmsg_length > 0 {
286 let cmsg_buffer =
287 libc::malloc(CMSG_SPACE(cmsg_length) as usize) as *mut cmsghdr;
288 if cmsg_buffer.is_null() {
289 return Err(UnixError::last());
290 }
291 (*cmsg_buffer).cmsg_len = CMSG_LEN(cmsg_length) as MsgControlLen;
292 (*cmsg_buffer).cmsg_level = libc::SOL_SOCKET;
293 (*cmsg_buffer).cmsg_type = libc::SCM_RIGHTS;
294
295 ptr::copy_nonoverlapping(
296 fds.as_ptr(),
297 CMSG_DATA(cmsg_buffer) as *mut c_int,
298 fds.len(),
299 );
300 (cmsg_buffer, CMSG_SPACE(cmsg_length))
301 } else {
302 (ptr::null_mut(), 0)
303 };
304
305 let mut iovec = [
306 iovec {
312 iov_base: &len as *const _ as *mut c_void,
313 iov_len: mem::size_of_val(&len),
314 },
315 iovec {
316 iov_base: data_buffer.as_ptr() as *mut c_void,
317 iov_len: data_buffer.len(),
318 },
319 ];
320
321 let msghdr = new_msghdr(&mut iovec, cmsg_buffer, cmsg_space as MsgControlLen);
322 let result = sendmsg(sender_fd, &msghdr, 0);
323 libc::free(cmsg_buffer as *mut c_void);
324 result
325 };
326
327 if result > 0 {
328 Ok(())
329 } else {
330 Err(UnixError::last())
331 }
332 }
333
334 fn send_followup_fragment(sender_fd: c_int, data_buffer: &[u8]) -> Result<(), UnixError> {
335 let result = unsafe {
336 libc::send(
337 sender_fd,
338 data_buffer.as_ptr() as *const c_void,
339 data_buffer.len(),
340 0,
341 )
342 };
343
344 if result > 0 {
345 Ok(())
346 } else {
347 Err(UnixError::last())
348 }
349 }
350
351 let mut sendbuf_size = *SYSTEM_SENDBUF_SIZE;
352
353 fn downsize(sendbuf_size: &mut usize, sent_size: usize) -> Result<(), ()> {
362 if sent_size > 2000 {
363 *sendbuf_size /= 2;
364 if *sendbuf_size >= sent_size {
366 *sendbuf_size = sent_size / 2;
367 }
368 Ok(())
369 } else {
370 Err(())
371 }
372 }
373
374 if data.len() <= Self::get_max_fragment_size() {
376 match send_first_fragment(self.fd.0, &fds[..], data, data.len()) {
377 Ok(_) => return Ok(()),
378 Err(error) => {
379 if !(matches!(error, UnixError::Errno(libc::ENOBUFS))
387 && downsize(&mut sendbuf_size, data.len()).is_ok())
388 {
389 return Err(error);
390 }
391 },
392 }
393 }
394
395 let (dedicated_tx, dedicated_rx) = channel()?;
403 fds.push(dedicated_rx.fd.get());
405
406 let mut byte_position = 0;
408 while byte_position < data.len() {
409 let end_byte_position;
410 let result = if byte_position == 0 {
411 end_byte_position = Self::first_fragment_size(sendbuf_size);
416 send_first_fragment(self.fd.0, &fds[..], &data[..end_byte_position], data.len())
417 } else {
418 end_byte_position = cmp::min(
421 byte_position + Self::fragment_size(sendbuf_size),
422 data.len(),
423 );
424 send_followup_fragment(dedicated_tx.fd.0, &data[byte_position..end_byte_position])
425 };
426
427 if let Err(error) = result {
428 if matches!(error, UnixError::Errno(libc::ENOBUFS))
429 && downsize(&mut sendbuf_size, end_byte_position - byte_position).is_ok()
430 {
431 continue;
434 } else {
435 return Err(error);
436 }
437 }
438
439 byte_position = end_byte_position;
440 }
441
442 Ok(())
443 }
444
445 pub fn connect(name: String) -> Result<OsIpcSender, UnixError> {
446 let name = CString::new(name).unwrap();
447 unsafe {
448 let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
449 let (sockaddr, len) = new_sockaddr_un(name.as_ptr());
450 if libc::connect(
451 fd,
452 &sockaddr as *const _ as *const sockaddr,
453 len as socklen_t,
454 ) < 0
455 {
456 return Err(UnixError::last());
457 }
458
459 Ok(OsIpcSender::from_fd(fd))
460 }
461 }
462}
463
464#[derive(PartialEq, Debug)]
465pub enum OsIpcChannel {
466 Sender(OsIpcSender),
467 Receiver(OsIpcReceiver),
468}
469
470impl OsIpcChannel {
471 fn fd(&self) -> c_int {
472 match *self {
473 OsIpcChannel::Sender(ref sender) => sender.fd.0,
474 OsIpcChannel::Receiver(ref receiver) => receiver.fd.get(),
475 }
476 }
477}
478
479pub struct OsIpcReceiverSet {
480 incrementor: RangeFrom<u64>,
481 poll: Poll,
482 pollfds: HashMap<Token, PollEntry, BuildHasherDefault<FnvHasher>>,
483 events: Events,
484}
485
486impl Drop for OsIpcReceiverSet {
487 fn drop(&mut self) {
488 for &PollEntry { id: _, fd } in self.pollfds.values() {
489 let result = unsafe { libc::close(fd) };
490 assert!(thread::panicking() || result == 0);
491 }
492 }
493}
494
495impl OsIpcReceiverSet {
496 pub fn new() -> Result<OsIpcReceiverSet, UnixError> {
497 let fnv = BuildHasherDefault::<FnvHasher>::default();
498 Ok(OsIpcReceiverSet {
499 incrementor: 0..,
500 poll: Poll::new()?,
501 pollfds: HashMap::with_hasher(fnv),
502 events: Events::with_capacity(10),
503 })
504 }
505
506 pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64, UnixError> {
507 let last_index = self.incrementor.next().unwrap();
508 let fd = receiver.consume_fd();
509 let fd_token = Token(fd as usize);
510 let poll_entry = PollEntry { id: last_index, fd };
511 self.poll
512 .registry()
513 .register(&mut SourceFd(&fd), fd_token, Interest::READABLE)?;
514 self.pollfds.insert(fd_token, poll_entry);
515 Ok(last_index)
516 }
517
518 pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>, UnixError> {
519 loop {
521 match self.poll.poll(&mut self.events, None) {
522 Ok(()) if !self.events.is_empty() => break,
523 Ok(()) => {},
524 Err(ref error) => {
525 if error.kind() != io::ErrorKind::Interrupted {
526 return Err(UnixError::last());
527 }
528 },
529 }
530
531 if !self.events.is_empty() {
532 break;
533 }
534 }
535
536 let mut selection_results = Vec::new();
537 for event in self.events.iter() {
538 assert!(event.is_readable());
540
541 let event_token = event.token();
542 let poll_entry = self
543 .pollfds
544 .get(&event_token)
545 .expect("Got event for unknown token.")
546 .clone();
547 loop {
548 match recv(poll_entry.fd, BlockingMode::Nonblocking) {
549 Ok(ipc_message) => {
550 selection_results.push(OsIpcSelectionResult::DataReceived(
551 poll_entry.id,
552 ipc_message,
553 ));
554 },
555 Err(err) if err.channel_is_closed() => {
556 self.pollfds.remove(&event_token).unwrap();
557 self.poll
558 .registry()
559 .deregister(&mut SourceFd(&poll_entry.fd))
560 .unwrap();
561 unsafe {
562 libc::close(poll_entry.fd);
563 }
564
565 selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id));
566 break;
567 },
568 Err(UnixError::Errno(code)) if code == EWOULDBLOCK => {
569 break;
573 },
574 Err(err) => return Err(err),
575 }
576 }
577 }
578
579 Ok(selection_results)
580 }
581}
582
583pub enum OsIpcSelectionResult {
584 DataReceived(u64, IpcMessage),
585 ChannelClosed(u64),
586}
587
588impl OsIpcSelectionResult {
589 pub fn unwrap(self) -> (u64, IpcMessage) {
590 match self {
591 OsIpcSelectionResult::DataReceived(id, ipc_message) => (id, ipc_message),
592 OsIpcSelectionResult::ChannelClosed(id) => {
593 panic!(
594 "OsIpcSelectionResult::unwrap(): receiver ID {} was closed!",
595 id
596 )
597 },
598 }
599 }
600}
601
602#[derive(PartialEq, Debug)]
603pub struct OsOpaqueIpcChannel {
604 fd: c_int,
605}
606
607impl Drop for OsOpaqueIpcChannel {
608 fn drop(&mut self) {
609 debug_assert!(self.fd == -1);
615 }
616}
617
618impl OsOpaqueIpcChannel {
619 fn from_fd(fd: c_int) -> OsOpaqueIpcChannel {
620 OsOpaqueIpcChannel { fd }
621 }
622
623 pub fn to_sender(&mut self) -> OsIpcSender {
624 OsIpcSender::from_fd(mem::replace(&mut self.fd, -1))
625 }
626
627 pub fn to_receiver(&mut self) -> OsIpcReceiver {
628 OsIpcReceiver::from_fd(mem::replace(&mut self.fd, -1))
629 }
630}
631
632pub struct OsIpcOneShotServer {
633 fd: c_int,
634
635 _temp_dir: TempDir,
639}
640
641impl Drop for OsIpcOneShotServer {
642 fn drop(&mut self) {
643 unsafe {
644 let result = libc::close(self.fd);
645 assert!(thread::panicking() || result == 0);
646 }
647 }
648}
649
650impl OsIpcOneShotServer {
651 pub fn new() -> Result<(OsIpcOneShotServer, String), UnixError> {
652 unsafe {
653 let fd = libc::socket(libc::AF_UNIX, SOCK_SEQPACKET | SOCK_FLAGS, 0);
654 let temp_dir = Builder::new().tempdir()?;
655 let socket_path = temp_dir.path().join("socket");
656 let path_string = socket_path.to_str().unwrap();
657
658 let path_c_string = CString::new(path_string).unwrap();
659 let (sockaddr, len) = new_sockaddr_un(path_c_string.as_ptr());
660 if libc::bind(
661 fd,
662 &sockaddr as *const _ as *const sockaddr,
663 len as socklen_t,
664 ) != 0
665 {
666 return Err(UnixError::last());
667 }
668
669 if libc::listen(fd, 10) != 0 {
670 return Err(UnixError::last());
671 }
672
673 Ok((
674 OsIpcOneShotServer {
675 fd,
676 _temp_dir: temp_dir,
677 },
678 path_string.to_string(),
679 ))
680 }
681 }
682
683 #[allow(clippy::type_complexity)]
684 pub fn accept(self) -> Result<(OsIpcReceiver, IpcMessage), UnixError> {
685 unsafe {
686 let sockaddr: *mut sockaddr = ptr::null_mut();
687 let sockaddr_len: *mut socklen_t = ptr::null_mut();
688 let client_fd = libc::accept(self.fd, sockaddr, sockaddr_len);
689 if client_fd < 0 {
690 return Err(UnixError::last());
691 }
692 make_socket_lingering(client_fd)?;
693
694 let receiver = OsIpcReceiver::from_fd(client_fd);
695 let ipc_message = receiver.recv()?;
696 Ok((receiver, ipc_message))
697 }
698 }
699}
700
701fn make_socket_lingering(sockfd: c_int) -> Result<(), UnixError> {
706 let linger = linger {
707 l_onoff: 1,
708 l_linger: 30,
709 };
710 let err = unsafe {
711 setsockopt(
712 sockfd,
713 SOL_SOCKET,
714 SO_LINGER,
715 &linger as *const _ as *const c_void,
716 mem::size_of::<linger>() as socklen_t,
717 )
718 };
719 if err < 0 {
720 let error = UnixError::last();
721 if let UnixError::Errno(libc::EINVAL) = error {
722 } else {
745 return Err(error);
746 }
747 }
748 Ok(())
749}
750
751struct BackingStore {
752 fd: c_int,
753}
754
755impl BackingStore {
756 pub fn new(length: usize) -> BackingStore {
757 let count = SHM_COUNT.fetch_add(1, Ordering::Relaxed);
758 let timestamp = UNIX_EPOCH.elapsed().unwrap();
759 let name = CString::new(format!(
760 "/ipc-channel-shared-memory.{}.{}.{}.{}",
761 count,
762 *PID,
763 timestamp.as_secs(),
764 timestamp.subsec_nanos()
765 ))
766 .unwrap();
767 let fd = create_shmem(name, length);
768 Self::from_fd(fd)
769 }
770
771 pub fn from_fd(fd: c_int) -> BackingStore {
772 BackingStore { fd }
773 }
774
775 pub fn fd(&self) -> c_int {
776 self.fd
777 }
778
779 pub unsafe fn map_file(&self, length: Option<size_t>) -> (*mut u8, size_t) {
780 let length = length.unwrap_or_else(|| {
781 let mut st = mem::MaybeUninit::uninit();
782 if libc::fstat(self.fd, st.as_mut_ptr()) != 0 {
783 panic!("error stating fd {}: {}", self.fd, UnixError::last());
784 }
785 st.assume_init().st_size as size_t
786 });
787 if length == 0 {
788 return (ptr::null_mut(), length);
790 }
791 let address = libc::mmap(
792 ptr::null_mut(),
793 length,
794 PROT_READ | PROT_WRITE,
795 MAP_SHARED,
796 self.fd,
797 0,
798 );
799 assert!(!address.is_null());
800 assert!(address != MAP_FAILED);
801 (address as *mut u8, length)
802 }
803}
804
805impl Drop for BackingStore {
806 fn drop(&mut self) {
807 unsafe {
808 let result = libc::close(self.fd);
809 assert!(thread::panicking() || result == 0);
810 }
811 }
812}
813
814pub struct OsIpcSharedMemory {
815 ptr: *mut u8,
816 length: usize,
817 store: BackingStore,
818}
819
820unsafe impl Send for OsIpcSharedMemory {}
821unsafe impl Sync for OsIpcSharedMemory {}
822
823impl Drop for OsIpcSharedMemory {
824 fn drop(&mut self) {
825 unsafe {
826 if !self.ptr.is_null() {
827 let result = libc::munmap(self.ptr as *mut c_void, self.length);
828 assert!(thread::panicking() || result == 0);
829 }
830 }
831 }
832}
833
834impl Clone for OsIpcSharedMemory {
835 fn clone(&self) -> OsIpcSharedMemory {
836 unsafe {
837 let store = BackingStore::from_fd(libc::dup(self.store.fd()));
838 let (address, _) = store.map_file(Some(self.length));
839 OsIpcSharedMemory::from_raw_parts(address, self.length, store)
840 }
841 }
842}
843
844impl PartialEq for OsIpcSharedMemory {
845 fn eq(&self, other: &OsIpcSharedMemory) -> bool {
846 **self == **other
847 }
848}
849
850impl Debug for OsIpcSharedMemory {
851 fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
852 (**self).fmt(formatter)
853 }
854}
855
856impl Deref for OsIpcSharedMemory {
857 type Target = [u8];
858
859 #[inline]
860 fn deref(&self) -> &[u8] {
861 unsafe { slice::from_raw_parts(self.ptr, self.length) }
862 }
863}
864
865impl OsIpcSharedMemory {
866 #[inline]
867 pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
868 unsafe { slice::from_raw_parts_mut(self.ptr, self.length) }
869 }
870}
871
872impl OsIpcSharedMemory {
873 unsafe fn from_raw_parts(
874 ptr: *mut u8,
875 length: usize,
876 store: BackingStore,
877 ) -> OsIpcSharedMemory {
878 OsIpcSharedMemory { ptr, length, store }
879 }
880
881 unsafe fn from_fd(fd: c_int) -> OsIpcSharedMemory {
882 let store = BackingStore::from_fd(fd);
883 let (ptr, length) = store.map_file(None);
884 OsIpcSharedMemory::from_raw_parts(ptr, length, store)
885 }
886
887 pub fn from_byte(byte: u8, length: usize) -> OsIpcSharedMemory {
888 unsafe {
889 let store = BackingStore::new(length);
890 let (address, _) = store.map_file(Some(length));
891 for element in slice::from_raw_parts_mut(address, length) {
892 *element = byte;
893 }
894 OsIpcSharedMemory::from_raw_parts(address, length, store)
895 }
896 }
897
898 pub fn from_bytes(bytes: &[u8]) -> OsIpcSharedMemory {
899 unsafe {
900 let store = BackingStore::new(bytes.len());
901 let (address, _) = store.map_file(Some(bytes.len()));
902 ptr::copy_nonoverlapping(bytes.as_ptr(), address, bytes.len());
903 OsIpcSharedMemory::from_raw_parts(address, bytes.len(), store)
904 }
905 }
906}
907
908#[derive(Debug)]
909pub enum UnixError {
910 Errno(c_int),
911 ChannelClosed,
912 IoError(io::Error),
913}
914
915impl UnixError {
916 fn last() -> UnixError {
917 UnixError::Errno(io::Error::last_os_error().raw_os_error().unwrap())
918 }
919
920 #[allow(dead_code)]
921 pub fn channel_is_closed(&self) -> bool {
922 matches!(self, UnixError::ChannelClosed)
923 }
924}
925
926impl fmt::Display for UnixError {
927 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
928 match self {
929 UnixError::Errno(errno) => {
930 fmt::Display::fmt(&io::Error::from_raw_os_error(*errno), fmt)
931 },
932 UnixError::ChannelClosed => write!(fmt, "All senders for this socket closed"),
933 UnixError::IoError(e) => write!(fmt, "{e}"),
934 }
935 }
936}
937
938impl StdError for UnixError {}
939
940impl From<UnixError> for bincode::Error {
941 fn from(unix_error: UnixError) -> Self {
942 io::Error::from(unix_error).into()
943 }
944}
945
946impl From<UnixError> for io::Error {
947 fn from(unix_error: UnixError) -> io::Error {
948 match unix_error {
949 UnixError::Errno(errno) => io::Error::from_raw_os_error(errno),
950 UnixError::ChannelClosed => io::Error::new(io::ErrorKind::ConnectionReset, unix_error),
951 UnixError::IoError(e) => e,
952 }
953 }
954}
955
956impl From<UnixError> for ipc::IpcError {
957 fn from(error: UnixError) -> Self {
958 match error {
959 UnixError::ChannelClosed => ipc::IpcError::Disconnected,
960 e => ipc::IpcError::Io(io::Error::from(e)),
961 }
962 }
963}
964
965impl From<UnixError> for ipc::TryRecvError {
966 fn from(error: UnixError) -> Self {
967 match error {
968 UnixError::ChannelClosed => ipc::TryRecvError::IpcError(ipc::IpcError::Disconnected),
969 UnixError::Errno(code) if code == EAGAIN || code == EWOULDBLOCK => {
970 ipc::TryRecvError::Empty
971 },
972 e => ipc::TryRecvError::IpcError(ipc::IpcError::Io(io::Error::from(e))),
973 }
974 }
975}
976
977impl From<io::Error> for UnixError {
978 fn from(e: io::Error) -> UnixError {
979 if let Some(errno) = e.raw_os_error() {
980 UnixError::Errno(errno)
981 } else {
982 assert!(e.kind() == io::ErrorKind::ConnectionReset);
983 UnixError::ChannelClosed
984 }
985 }
986}
987
988#[derive(Copy, Clone)]
989enum BlockingMode {
990 Blocking,
991 Nonblocking,
992 Timeout(Duration),
993}
994
995#[allow(clippy::uninit_vec, clippy::type_complexity)]
996fn recv(fd: c_int, blocking_mode: BlockingMode) -> Result<IpcMessage, UnixError> {
997 let (mut channels, mut shared_memory_regions) = (Vec::new(), Vec::new());
998
999 let mut total_size = 0usize;
1004 let mut main_data_buffer;
1005 unsafe {
1006 main_data_buffer = Vec::with_capacity(OsIpcSender::get_max_fragment_size());
1008 main_data_buffer.set_len(OsIpcSender::get_max_fragment_size());
1009
1010 let mut iovec = [
1011 iovec {
1012 iov_base: &mut total_size as *mut _ as *mut c_void,
1013 iov_len: mem::size_of_val(&total_size),
1014 },
1015 iovec {
1016 iov_base: main_data_buffer.as_mut_ptr() as *mut c_void,
1017 iov_len: main_data_buffer.len(),
1018 },
1019 ];
1020 let mut cmsg = UnixCmsg::new(&mut iovec)?;
1021
1022 let bytes_read = cmsg.recv(fd, blocking_mode)?;
1023 main_data_buffer.set_len(bytes_read - mem::size_of_val(&total_size));
1024
1025 let cmsg_fds = CMSG_DATA(cmsg.cmsg_buffer) as *const c_int;
1026 let cmsg_length = cmsg.msghdr.msg_controllen;
1027 let channel_length = if cmsg_length == 0 {
1028 0
1029 } else {
1030 (cmsg.cmsg_len() - CMSG_SPACE(0) as size_t) / mem::size_of::<c_int>()
1034 };
1035 for index in 0..channel_length {
1036 let fd = *cmsg_fds.add(index);
1037 if is_socket(fd) {
1038 channels.push(OsOpaqueIpcChannel::from_fd(fd));
1039 continue;
1040 }
1041 shared_memory_regions.push(OsIpcSharedMemory::from_fd(fd));
1042 }
1043 }
1044
1045 if total_size == main_data_buffer.len() {
1046 return Ok(IpcMessage::new(
1048 main_data_buffer,
1049 channels,
1050 shared_memory_regions,
1051 ));
1052 }
1053
1054 let dedicated_rx = channels.pop().unwrap().to_receiver();
1059
1060 let len = main_data_buffer.len();
1062 main_data_buffer.reserve_exact(total_size - len);
1063
1064 while main_data_buffer.len() < total_size {
1066 let write_pos = main_data_buffer.len();
1067 let end_pos = cmp::min(
1068 write_pos + OsIpcSender::fragment_size(*SYSTEM_SENDBUF_SIZE),
1069 total_size,
1070 );
1071 let result = unsafe {
1072 assert!(end_pos <= main_data_buffer.capacity());
1073 main_data_buffer.set_len(end_pos);
1074
1075 assert!(end_pos >= write_pos);
1077
1078 let result = libc::recv(
1082 dedicated_rx.fd.get(),
1083 main_data_buffer[write_pos..].as_mut_ptr() as *mut c_void,
1084 end_pos - write_pos,
1085 0,
1086 );
1087 main_data_buffer.set_len(write_pos + cmp::max(result, 0) as usize);
1088 result
1089 };
1090
1091 match result.cmp(&0) {
1092 cmp::Ordering::Greater => continue,
1093 cmp::Ordering::Equal => return Err(UnixError::ChannelClosed),
1094 cmp::Ordering::Less => return Err(UnixError::last()),
1095 }
1096 }
1097
1098 Ok(IpcMessage::new(
1099 main_data_buffer,
1100 channels,
1101 shared_memory_regions,
1102 ))
1103}
1104
1105fn new_msghdr(iovec: &mut [iovec], cmsg_buffer: *mut cmsghdr, cmsg_space: MsgControlLen) -> msghdr {
1107 let mut msghdr: msghdr = unsafe { mem::zeroed() };
1108 msghdr.msg_name = ptr::null_mut();
1109 msghdr.msg_namelen = 0;
1110 msghdr.msg_iov = iovec.as_mut_ptr();
1111 msghdr.msg_iovlen = iovec.len() as IovLen;
1112 msghdr.msg_control = cmsg_buffer as *mut c_void;
1113 msghdr.msg_controllen = cmsg_space;
1114 msghdr.msg_flags = 0;
1115 msghdr
1116}
1117
1118fn create_shmem(name: CString, length: usize) -> c_int {
1119 unsafe {
1120 let fd = libc::memfd_create(name.as_ptr(), libc::MFD_CLOEXEC);
1121 assert!(fd >= 0);
1122 assert_eq!(libc::ftruncate(fd, length as off_t), 0);
1123 fd
1124 }
1125}
1126
1127struct UnixCmsg {
1128 cmsg_buffer: *mut cmsghdr,
1129 msghdr: msghdr,
1130}
1131
1132unsafe impl Send for UnixCmsg {}
1133
1134impl Drop for UnixCmsg {
1135 fn drop(&mut self) {
1136 unsafe {
1137 libc::free(self.cmsg_buffer as *mut c_void);
1138 }
1139 }
1140}
1141
1142impl UnixCmsg {
1143 unsafe fn new(iovec: &mut [iovec]) -> Result<UnixCmsg, UnixError> {
1144 let cmsg_length = CMSG_SPACE(MAX_FDS_IN_CMSG * (mem::size_of::<c_int>() as c_uint));
1145 let cmsg_buffer = libc::malloc(cmsg_length as usize) as *mut cmsghdr;
1146 if cmsg_buffer.is_null() {
1147 return Err(UnixError::last());
1148 }
1149 Ok(UnixCmsg {
1150 cmsg_buffer,
1151 msghdr: new_msghdr(iovec, cmsg_buffer, cmsg_length as MsgControlLen),
1152 })
1153 }
1154
1155 unsafe fn recv(&mut self, fd: c_int, blocking_mode: BlockingMode) -> Result<usize, UnixError> {
1156 match blocking_mode {
1157 BlockingMode::Nonblocking => {
1158 if libc::fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK) < 0 {
1159 return Err(UnixError::last());
1160 }
1161 },
1162 BlockingMode::Timeout(duration) => {
1163 let events = libc::POLLIN | libc::POLLPRI | libc::POLLRDHUP;
1164
1165 let mut fd = [libc::pollfd {
1166 fd,
1167 events,
1168 revents: 0,
1169 }];
1170 let result = libc::poll(
1171 fd.as_mut_ptr(),
1172 fd.len() as _,
1173 duration.as_millis().try_into().unwrap_or(-1),
1174 );
1175
1176 match result.cmp(&0) {
1177 cmp::Ordering::Equal => return Err(UnixError::Errno(EAGAIN)),
1178 cmp::Ordering::Less => return Err(UnixError::last()),
1179 cmp::Ordering::Greater => {},
1180 }
1181 },
1182 BlockingMode::Blocking => {},
1183 }
1184
1185 let result = recvmsg(fd, &mut self.msghdr, RECVMSG_FLAGS);
1186
1187 let result = match result.cmp(&0) {
1188 cmp::Ordering::Equal => Err(UnixError::ChannelClosed),
1189 cmp::Ordering::Less => Err(UnixError::last()),
1190 cmp::Ordering::Greater => Ok(result as usize),
1191 };
1192
1193 if let BlockingMode::Nonblocking = blocking_mode {
1194 if libc::fcntl(fd, libc::F_SETFL, 0) < 0 {
1195 return Err(UnixError::last());
1196 }
1197 }
1198 result
1199 }
1200
1201 unsafe fn cmsg_len(&self) -> size_t {
1202 (*(self.msghdr.msg_control as *const cmsghdr)).cmsg_len as size_t
1203 }
1204}
1205
1206fn is_socket(fd: c_int) -> bool {
1207 unsafe {
1208 let mut st = mem::MaybeUninit::uninit();
1209 if libc::fstat(fd, st.as_mut_ptr()) != 0 {
1210 return false;
1211 }
1212 (st.assume_init().st_mode & S_IFMT) == S_IFSOCK
1213 }
1214}