1use crate::platform::{self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender};
11use crate::platform::{
12 OsIpcOneShotServer, OsIpcSelectionResult, OsIpcSharedMemory, OsOpaqueIpcChannel,
13};
14
15use bincode;
16use serde::{Deserialize, Deserializer, Serialize, Serializer};
17use std::cell::RefCell;
18use std::cmp::min;
19use std::error::Error as StdError;
20use std::fmt::{self, Debug, Formatter};
21use std::io;
22use std::marker::PhantomData;
23use std::mem;
24use std::ops::Deref;
25use std::time::Duration;
26
27thread_local! {
28 static OS_IPC_CHANNELS_FOR_DESERIALIZATION: RefCell<Vec<OsOpaqueIpcChannel>> =
29 const { RefCell::new(Vec::new()) }
30}
31thread_local! {
32 static OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION:
33 RefCell<Vec<Option<OsIpcSharedMemory>>> = const { RefCell::new(Vec::new()) }
34}
35thread_local! {
36 static OS_IPC_CHANNELS_FOR_SERIALIZATION: RefCell<Vec<OsIpcChannel>> = const { RefCell::new(Vec::new()) }
37}
38thread_local! {
39 static OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION: RefCell<Vec<OsIpcSharedMemory>> =
40 const { RefCell::new(Vec::new()) }
41}
42
43#[derive(Debug)]
44pub enum IpcError {
45 Bincode(bincode::Error),
46 Io(io::Error),
47 Disconnected,
48}
49
50impl fmt::Display for IpcError {
51 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
52 match *self {
53 IpcError::Bincode(ref err) => write!(fmt, "bincode error: {}", err),
54 IpcError::Io(ref err) => write!(fmt, "io error: {}", err),
55 IpcError::Disconnected => write!(fmt, "disconnected"),
56 }
57 }
58}
59
60impl StdError for IpcError {
61 fn source(&self) -> Option<&(dyn StdError + 'static)> {
62 match *self {
63 IpcError::Bincode(ref err) => Some(err),
64 IpcError::Io(ref err) => Some(err),
65 IpcError::Disconnected => None,
66 }
67 }
68}
69
70#[derive(Debug)]
71pub enum TryRecvError {
72 IpcError(IpcError),
73 Empty,
74}
75
76impl fmt::Display for TryRecvError {
77 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
78 match *self {
79 TryRecvError::IpcError(ref err) => write!(fmt, "ipc error: {}", err),
80 TryRecvError::Empty => write!(fmt, "empty"),
81 }
82 }
83}
84
85impl StdError for TryRecvError {
86 fn source(&self) -> Option<&(dyn StdError + 'static)> {
87 match *self {
88 TryRecvError::IpcError(ref err) => Some(err),
89 TryRecvError::Empty => None,
90 }
91 }
92}
93
94pub fn channel<T>() -> Result<(IpcSender<T>, IpcReceiver<T>), io::Error>
125where
126 T: for<'de> Deserialize<'de> + Serialize,
127{
128 let (os_sender, os_receiver) = platform::channel()?;
129 let ipc_receiver = IpcReceiver {
130 os_receiver,
131 phantom: PhantomData,
132 };
133 let ipc_sender = IpcSender {
134 os_sender,
135 phantom: PhantomData,
136 };
137 Ok((ipc_sender, ipc_receiver))
138}
139
140pub fn bytes_channel() -> Result<(IpcBytesSender, IpcBytesReceiver), io::Error> {
169 let (os_sender, os_receiver) = platform::channel()?;
170 let ipc_bytes_receiver = IpcBytesReceiver { os_receiver };
171 let ipc_bytes_sender = IpcBytesSender { os_sender };
172 Ok((ipc_bytes_sender, ipc_bytes_receiver))
173}
174
175#[derive(Debug)]
243pub struct IpcReceiver<T> {
244 os_receiver: OsIpcReceiver,
245 phantom: PhantomData<T>,
246}
247
248impl<T> IpcReceiver<T>
249where
250 T: for<'de> Deserialize<'de> + Serialize,
251{
252 pub fn recv(&self) -> Result<T, IpcError> {
254 self.os_receiver.recv()?.to().map_err(IpcError::Bincode)
255 }
256
257 pub fn try_recv(&self) -> Result<T, TryRecvError> {
259 self.os_receiver
260 .try_recv()?
261 .to()
262 .map_err(IpcError::Bincode)
263 .map_err(TryRecvError::IpcError)
264 }
265
266 pub fn try_recv_timeout(&self, duration: Duration) -> Result<T, TryRecvError> {
273 self.os_receiver
274 .try_recv_timeout(duration)?
275 .to()
276 .map_err(IpcError::Bincode)
277 .map_err(TryRecvError::IpcError)
278 }
279
280 pub fn to_opaque(self) -> OpaqueIpcReceiver {
284 OpaqueIpcReceiver {
285 os_receiver: self.os_receiver,
286 }
287 }
288}
289
290impl<'de, T> Deserialize<'de> for IpcReceiver<T> {
291 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
292 where
293 D: Deserializer<'de>,
294 {
295 let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
296 Ok(IpcReceiver {
297 os_receiver,
298 phantom: PhantomData,
299 })
300 }
301}
302
303impl<T> Serialize for IpcReceiver<T> {
304 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
305 where
306 S: Serializer,
307 {
308 serialize_os_ipc_receiver(&self.os_receiver, serializer)
309 }
310}
311
312#[derive(Debug)]
333pub struct IpcSender<T> {
334 os_sender: OsIpcSender,
335 phantom: PhantomData<T>,
336}
337
338impl<T> Clone for IpcSender<T>
339where
340 T: Serialize,
341{
342 fn clone(&self) -> IpcSender<T> {
343 IpcSender {
344 os_sender: self.os_sender.clone(),
345 phantom: PhantomData,
346 }
347 }
348}
349
350impl<T> IpcSender<T>
351where
352 T: Serialize,
353{
354 pub fn connect(name: String) -> Result<IpcSender<T>, io::Error> {
359 Ok(IpcSender {
360 os_sender: OsIpcSender::connect(name)?,
361 phantom: PhantomData,
362 })
363 }
364
365 pub fn send(&self, data: T) -> Result<(), bincode::Error> {
367 let mut bytes = Vec::with_capacity(4096);
368 OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
369 OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with(
370 |os_ipc_shared_memory_regions_for_serialization| {
371 let old_os_ipc_channels =
372 mem::take(&mut *os_ipc_channels_for_serialization.borrow_mut());
373 let old_os_ipc_shared_memory_regions = mem::take(
374 &mut *os_ipc_shared_memory_regions_for_serialization.borrow_mut(),
375 );
376 let os_ipc_shared_memory_regions;
377 let os_ipc_channels;
378 {
379 bincode::serialize_into(&mut bytes, &data)?;
380 os_ipc_channels = mem::replace(
381 &mut *os_ipc_channels_for_serialization.borrow_mut(),
382 old_os_ipc_channels,
383 );
384 os_ipc_shared_memory_regions = mem::replace(
385 &mut *os_ipc_shared_memory_regions_for_serialization.borrow_mut(),
386 old_os_ipc_shared_memory_regions,
387 );
388 };
389 Ok(self.os_sender.send(
390 &bytes[..],
391 os_ipc_channels,
392 os_ipc_shared_memory_regions,
393 )?)
394 },
395 )
396 })
397 }
398
399 pub fn to_opaque(self) -> OpaqueIpcSender {
400 OpaqueIpcSender {
401 os_sender: self.os_sender,
402 }
403 }
404}
405
406impl<'de, T> Deserialize<'de> for IpcSender<T> {
407 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
408 where
409 D: Deserializer<'de>,
410 {
411 let os_sender = deserialize_os_ipc_sender(deserializer)?;
412 Ok(IpcSender {
413 os_sender,
414 phantom: PhantomData,
415 })
416 }
417}
418
419impl<T> Serialize for IpcSender<T> {
420 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
421 where
422 S: Serializer,
423 {
424 serialize_os_ipc_sender(&self.os_sender, serializer)
425 }
426}
427
428pub struct IpcReceiverSet {
463 os_receiver_set: OsIpcReceiverSet,
464}
465
466impl IpcReceiverSet {
467 pub fn new() -> Result<IpcReceiverSet, io::Error> {
475 Ok(IpcReceiverSet {
476 os_receiver_set: OsIpcReceiverSet::new()?,
477 })
478 }
479
480 pub fn add<T>(&mut self, receiver: IpcReceiver<T>) -> Result<u64, io::Error>
483 where
484 T: for<'de> Deserialize<'de> + Serialize,
485 {
486 Ok(self.os_receiver_set.add(receiver.os_receiver)?)
487 }
488
489 pub fn add_opaque(&mut self, receiver: OpaqueIpcReceiver) -> Result<u64, io::Error> {
492 Ok(self.os_receiver_set.add(receiver.os_receiver)?)
493 }
494
495 pub fn select(&mut self) -> Result<Vec<IpcSelectionResult>, io::Error> {
501 let results = self.os_receiver_set.select()?;
502 Ok(results
503 .into_iter()
504 .map(|result| match result {
505 OsIpcSelectionResult::DataReceived(os_receiver_id, ipc_message) => {
506 IpcSelectionResult::MessageReceived(os_receiver_id, ipc_message)
507 },
508 OsIpcSelectionResult::ChannelClosed(os_receiver_id) => {
509 IpcSelectionResult::ChannelClosed(os_receiver_id)
510 },
511 })
512 .collect())
513 }
514}
515
516#[derive(Clone, Debug, PartialEq)]
530pub struct IpcSharedMemory {
531 os_shared_memory: Option<OsIpcSharedMemory>,
533}
534
535impl Deref for IpcSharedMemory {
536 type Target = [u8];
537
538 #[inline]
539 fn deref(&self) -> &[u8] {
540 if let Some(os_shared_memory) = &self.os_shared_memory {
541 os_shared_memory
542 } else {
543 &[]
544 }
545 }
546}
547
548impl<'de> Deserialize<'de> for IpcSharedMemory {
549 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
550 where
551 D: Deserializer<'de>,
552 {
553 let index: usize = Deserialize::deserialize(deserializer)?;
554 if index == usize::MAX {
555 return Ok(IpcSharedMemory::empty());
556 }
557
558 let os_shared_memory = OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with(
559 |os_ipc_shared_memory_regions_for_deserialization| {
560 os_ipc_shared_memory_regions_for_deserialization.borrow_mut()[index]
563 .take()
564 .unwrap()
565 },
566 );
567 Ok(IpcSharedMemory {
568 os_shared_memory: Some(os_shared_memory),
569 })
570 }
571}
572
573impl Serialize for IpcSharedMemory {
574 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
575 where
576 S: Serializer,
577 {
578 if let Some(os_shared_memory) = &self.os_shared_memory {
579 let index = OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with(
580 |os_ipc_shared_memory_regions_for_serialization| {
581 let mut os_ipc_shared_memory_regions_for_serialization =
582 os_ipc_shared_memory_regions_for_serialization.borrow_mut();
583 let index = os_ipc_shared_memory_regions_for_serialization.len();
584 os_ipc_shared_memory_regions_for_serialization.push(os_shared_memory.clone());
585 index
586 },
587 );
588 debug_assert!(index < usize::MAX);
589 index
590 } else {
591 usize::MAX
592 }
593 .serialize(serializer)
594 }
595}
596
597impl IpcSharedMemory {
598 const fn empty() -> Self {
599 Self {
600 os_shared_memory: None,
601 }
602 }
603
604 pub fn from_bytes(bytes: &[u8]) -> IpcSharedMemory {
606 if bytes.is_empty() {
607 IpcSharedMemory::empty()
608 } else {
609 IpcSharedMemory {
610 os_shared_memory: Some(OsIpcSharedMemory::from_bytes(bytes)),
611 }
612 }
613 }
614
615 pub fn from_byte(byte: u8, length: usize) -> IpcSharedMemory {
618 if length == 0 {
619 IpcSharedMemory::empty()
620 } else {
621 IpcSharedMemory {
622 os_shared_memory: Some(OsIpcSharedMemory::from_byte(byte, length)),
623 }
624 }
625 }
626}
627
628pub enum IpcSelectionResult {
632 MessageReceived(u64, IpcMessage),
635 ChannelClosed(u64),
638}
639
640impl IpcSelectionResult {
641 pub fn unwrap(self) -> (u64, IpcMessage) {
652 match self {
653 IpcSelectionResult::MessageReceived(id, message) => (id, message),
654 IpcSelectionResult::ChannelClosed(id) => {
655 panic!("IpcSelectionResult::unwrap(): channel {} closed", id)
656 },
657 }
658 }
659}
660
661#[derive(PartialEq)]
667pub struct IpcMessage {
668 pub(crate) data: Vec<u8>,
669 pub(crate) os_ipc_channels: Vec<OsOpaqueIpcChannel>,
670 pub(crate) os_ipc_shared_memory_regions: Vec<OsIpcSharedMemory>,
671}
672
673impl IpcMessage {
674 pub fn from_data(data: Vec<u8>) -> Self {
677 Self {
678 data,
679 os_ipc_channels: vec![],
680 os_ipc_shared_memory_regions: vec![],
681 }
682 }
683}
684
685impl Debug for IpcMessage {
686 fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
687 match String::from_utf8(self.data.clone()) {
688 Ok(string) => string.chars().take(256).collect::<String>().fmt(formatter),
689 Err(..) => self.data[0..min(self.data.len(), 256)].fmt(formatter),
690 }
691 }
692}
693
694impl IpcMessage {
695 pub(crate) fn new(
696 data: Vec<u8>,
697 os_ipc_channels: Vec<OsOpaqueIpcChannel>,
698 os_ipc_shared_memory_regions: Vec<OsIpcSharedMemory>,
699 ) -> IpcMessage {
700 IpcMessage {
701 data,
702 os_ipc_channels,
703 os_ipc_shared_memory_regions,
704 }
705 }
706
707 pub fn to<T>(mut self) -> Result<T, bincode::Error>
709 where
710 T: for<'de> Deserialize<'de> + Serialize,
711 {
712 OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
713 OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with(
714 |os_ipc_shared_memory_regions_for_deserialization| {
715 mem::swap(
716 &mut *os_ipc_channels_for_deserialization.borrow_mut(),
717 &mut self.os_ipc_channels,
718 );
719 let old_ipc_shared_memory_regions_for_deserialization = mem::replace(
720 &mut *os_ipc_shared_memory_regions_for_deserialization.borrow_mut(),
721 self.os_ipc_shared_memory_regions
722 .into_iter()
723 .map(Some)
724 .collect(),
725 );
726 let result = bincode::deserialize(&self.data[..]);
727 *os_ipc_shared_memory_regions_for_deserialization.borrow_mut() =
728 old_ipc_shared_memory_regions_for_deserialization;
729 mem::swap(
730 &mut *os_ipc_channels_for_deserialization.borrow_mut(),
731 &mut self.os_ipc_channels,
732 );
733 result
736 },
737 )
738 })
739 }
740}
741
742#[derive(Clone, Debug)]
743pub struct OpaqueIpcSender {
744 os_sender: OsIpcSender,
745}
746
747impl OpaqueIpcSender {
748 pub fn to<'de, T>(self) -> IpcSender<T>
749 where
750 T: Deserialize<'de> + Serialize,
751 {
752 IpcSender {
753 os_sender: self.os_sender,
754 phantom: PhantomData,
755 }
756 }
757}
758
759impl<'de> Deserialize<'de> for OpaqueIpcSender {
760 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
761 where
762 D: Deserializer<'de>,
763 {
764 let os_sender = deserialize_os_ipc_sender(deserializer)?;
765 Ok(OpaqueIpcSender { os_sender })
766 }
767}
768
769impl Serialize for OpaqueIpcSender {
770 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
771 where
772 S: Serializer,
773 {
774 serialize_os_ipc_sender(&self.os_sender, serializer)
775 }
776}
777
778#[derive(Debug)]
779pub struct OpaqueIpcReceiver {
780 os_receiver: OsIpcReceiver,
781}
782
783impl OpaqueIpcReceiver {
784 pub fn to<'de, T>(self) -> IpcReceiver<T>
785 where
786 T: Deserialize<'de> + Serialize,
787 {
788 IpcReceiver {
789 os_receiver: self.os_receiver,
790 phantom: PhantomData,
791 }
792 }
793}
794
795impl<'de> Deserialize<'de> for OpaqueIpcReceiver {
796 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
797 where
798 D: Deserializer<'de>,
799 {
800 let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
801 Ok(OpaqueIpcReceiver { os_receiver })
802 }
803}
804
805impl Serialize for OpaqueIpcReceiver {
806 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
807 where
808 S: Serializer,
809 {
810 serialize_os_ipc_receiver(&self.os_receiver, serializer)
811 }
812}
813
814pub struct IpcOneShotServer<T> {
848 os_server: OsIpcOneShotServer,
849 phantom: PhantomData<T>,
850}
851
852impl<T> IpcOneShotServer<T>
853where
854 T: for<'de> Deserialize<'de> + Serialize,
855{
856 pub fn new() -> Result<(IpcOneShotServer<T>, String), io::Error> {
857 let (os_server, name) = OsIpcOneShotServer::new()?;
858 Ok((
859 IpcOneShotServer {
860 os_server,
861 phantom: PhantomData,
862 },
863 name,
864 ))
865 }
866
867 pub fn accept(self) -> Result<(IpcReceiver<T>, T), bincode::Error> {
868 let (os_receiver, ipc_message) = self.os_server.accept()?;
869 Ok((
870 IpcReceiver {
871 os_receiver,
872 phantom: PhantomData,
873 },
874 ipc_message.to()?,
875 ))
876 }
877}
878
879#[derive(Debug)]
881pub struct IpcBytesReceiver {
882 os_receiver: OsIpcReceiver,
883}
884
885impl IpcBytesReceiver {
886 #[inline]
888 pub fn recv(&self) -> Result<Vec<u8>, IpcError> {
889 match self.os_receiver.recv() {
890 Ok(ipc_message) => Ok(ipc_message.data),
891 Err(err) => Err(err.into()),
892 }
893 }
894
895 pub fn try_recv(&self) -> Result<Vec<u8>, TryRecvError> {
897 match self.os_receiver.try_recv() {
898 Ok(ipc_message) => Ok(ipc_message.data),
899 Err(err) => Err(err.into()),
900 }
901 }
902}
903
904impl<'de> Deserialize<'de> for IpcBytesReceiver {
905 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
906 where
907 D: Deserializer<'de>,
908 {
909 let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
910 Ok(IpcBytesReceiver { os_receiver })
911 }
912}
913
914impl Serialize for IpcBytesReceiver {
915 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
916 where
917 S: Serializer,
918 {
919 serialize_os_ipc_receiver(&self.os_receiver, serializer)
920 }
921}
922
923#[derive(Debug)]
925pub struct IpcBytesSender {
926 os_sender: OsIpcSender,
927}
928
929impl Clone for IpcBytesSender {
930 fn clone(&self) -> IpcBytesSender {
931 IpcBytesSender {
932 os_sender: self.os_sender.clone(),
933 }
934 }
935}
936
937impl<'de> Deserialize<'de> for IpcBytesSender {
938 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
939 where
940 D: Deserializer<'de>,
941 {
942 let os_sender = deserialize_os_ipc_sender(deserializer)?;
943 Ok(IpcBytesSender { os_sender })
944 }
945}
946
947impl Serialize for IpcBytesSender {
948 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
949 where
950 S: Serializer,
951 {
952 serialize_os_ipc_sender(&self.os_sender, serializer)
953 }
954}
955
956impl IpcBytesSender {
957 #[inline]
958 pub fn send(&self, data: &[u8]) -> Result<(), io::Error> {
959 self.os_sender
960 .send(data, vec![], vec![])
961 .map_err(io::Error::from)
962 }
963}
964
965fn serialize_os_ipc_sender<S>(os_ipc_sender: &OsIpcSender, serializer: S) -> Result<S::Ok, S::Error>
966where
967 S: Serializer,
968{
969 let index = OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
970 let mut os_ipc_channels_for_serialization = os_ipc_channels_for_serialization.borrow_mut();
971 let index = os_ipc_channels_for_serialization.len();
972 os_ipc_channels_for_serialization.push(OsIpcChannel::Sender(os_ipc_sender.clone()));
973 index
974 });
975 index.serialize(serializer)
976}
977
978fn deserialize_os_ipc_sender<'de, D>(deserializer: D) -> Result<OsIpcSender, D::Error>
979where
980 D: Deserializer<'de>,
981{
982 let index: usize = Deserialize::deserialize(deserializer)?;
983 OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
984 Ok(os_ipc_channels_for_deserialization.borrow_mut()[index].to_sender())
987 })
988}
989
990fn serialize_os_ipc_receiver<S>(
991 os_receiver: &OsIpcReceiver,
992 serializer: S,
993) -> Result<S::Ok, S::Error>
994where
995 S: Serializer,
996{
997 let index = OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
998 let mut os_ipc_channels_for_serialization = os_ipc_channels_for_serialization.borrow_mut();
999 let index = os_ipc_channels_for_serialization.len();
1000 os_ipc_channels_for_serialization.push(OsIpcChannel::Receiver(os_receiver.consume()));
1001 index
1002 });
1003 index.serialize(serializer)
1004}
1005
1006fn deserialize_os_ipc_receiver<'de, D>(deserializer: D) -> Result<OsIpcReceiver, D::Error>
1007where
1008 D: Deserializer<'de>,
1009{
1010 let index: usize = Deserialize::deserialize(deserializer)?;
1011
1012 OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
1013 Ok(os_ipc_channels_for_deserialization.borrow_mut()[index].to_receiver())
1016 })
1017}