1use crate::error::SerDeError;
11use crate::platform::{self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender};
12use crate::platform::{
13 OsIpcOneShotServer, OsIpcSelectionResult, OsIpcSharedMemory, OsOpaqueIpcChannel,
14 OsTrySelectError,
15};
16use crate::{IpcError, TryRecvError, TrySelectError};
17
18use serde_core::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
19use std::cell::RefCell;
20use std::cmp::min;
21use std::fmt::{self, Debug, Formatter};
22use std::io;
23use std::marker::PhantomData;
24use std::ops::Deref;
25use std::time::Duration;
26
27thread_local! {
28 static OS_IPC_CHANNELS_FOR_DESERIALIZATION: RefCell<Vec<OsOpaqueIpcChannel>> =
29 const { RefCell::new(Vec::new()) };
30
31 static OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION:
32 RefCell<Vec<Option<OsIpcSharedMemory>>> = const { RefCell::new(Vec::new()) };
33
34 static OS_IPC_CHANNELS_FOR_SERIALIZATION: RefCell<Vec<OsIpcChannel>> = const { RefCell::new(Vec::new()) };
35
36 static OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION: RefCell<Vec<OsIpcSharedMemory>> =
37 const { RefCell::new(Vec::new()) }
38}
39
40pub fn channel<T>() -> Result<(IpcSender<T>, IpcReceiver<T>), io::Error>
71where
72 T: for<'de> Deserialize<'de> + Serialize,
73{
74 let (os_sender, os_receiver) = platform::channel()?;
75 let ipc_receiver = IpcReceiver {
76 os_receiver,
77 phantom: PhantomData,
78 };
79 let ipc_sender = IpcSender {
80 os_sender,
81 phantom: PhantomData,
82 };
83 Ok((ipc_sender, ipc_receiver))
84}
85
86pub fn bytes_channel() -> Result<(IpcBytesSender, IpcBytesReceiver), io::Error> {
115 let (os_sender, os_receiver) = platform::channel()?;
116 let ipc_bytes_receiver = IpcBytesReceiver { os_receiver };
117 let ipc_bytes_sender = IpcBytesSender { os_sender };
118 Ok((ipc_bytes_sender, ipc_bytes_receiver))
119}
120
121#[derive(Debug)]
189pub struct IpcReceiver<T> {
190 os_receiver: OsIpcReceiver,
191 phantom: PhantomData<T>,
192}
193
194impl<T> IpcReceiver<T>
195where
196 T: for<'de> Deserialize<'de> + Serialize,
197{
198 pub fn recv(&self) -> Result<T, IpcError> {
200 self.os_receiver
201 .recv()?
202 .to()
203 .map_err(IpcError::SerializationError)
204 }
205
206 pub fn try_recv(&self) -> Result<T, TryRecvError> {
208 self.os_receiver
209 .try_recv()?
210 .to()
211 .map_err(IpcError::SerializationError)
212 .map_err(TryRecvError::IpcError)
213 }
214
215 pub fn try_recv_timeout(&self, duration: Duration) -> Result<T, TryRecvError> {
222 self.os_receiver
223 .try_recv_timeout(duration)?
224 .to()
225 .map_err(IpcError::SerializationError)
226 .map_err(TryRecvError::IpcError)
227 }
228
229 pub fn to_opaque(self) -> OpaqueIpcReceiver {
233 OpaqueIpcReceiver {
234 os_receiver: self.os_receiver,
235 }
236 }
237}
238
239impl<'de, T> Deserialize<'de> for IpcReceiver<T> {
240 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
241 where
242 D: Deserializer<'de>,
243 {
244 let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
245 Ok(IpcReceiver {
246 os_receiver,
247 phantom: PhantomData,
248 })
249 }
250}
251
252impl<T> Serialize for IpcReceiver<T> {
253 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
254 where
255 S: Serializer,
256 {
257 serialize_os_ipc_receiver(&self.os_receiver, serializer)
258 }
259}
260
261#[derive(Debug)]
282pub struct IpcSender<T> {
283 os_sender: OsIpcSender,
284 phantom: PhantomData<T>,
285}
286
287impl<T> Clone for IpcSender<T>
288where
289 T: Serialize,
290{
291 fn clone(&self) -> IpcSender<T> {
292 IpcSender {
293 os_sender: self.os_sender.clone(),
294 phantom: PhantomData,
295 }
296 }
297}
298
299impl<T> IpcSender<T>
300where
301 T: Serialize,
302{
303 pub fn connect(name: String) -> Result<IpcSender<T>, io::Error> {
312 Ok(IpcSender {
313 os_sender: OsIpcSender::connect(name)?,
314 phantom: PhantomData,
315 })
316 }
317
318 pub fn send(&self, data: T) -> Result<(), IpcError> {
320 OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
321 OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with(
322 |os_ipc_shared_memory_regions_for_serialization| {
323 let bytes = postcard::to_stdvec(&data).map_err(SerDeError)?;
324 let os_ipc_channels = os_ipc_channels_for_serialization.take();
325 let os_ipc_shared_memory_regions =
326 os_ipc_shared_memory_regions_for_serialization.take();
327 Ok(self.os_sender.send(
328 &bytes[..],
329 os_ipc_channels,
330 os_ipc_shared_memory_regions,
331 )?)
332 },
333 )
334 })
335 }
336
337 pub fn to_opaque(self) -> OpaqueIpcSender {
338 OpaqueIpcSender {
339 os_sender: self.os_sender,
340 }
341 }
342}
343
344impl<'de, T> Deserialize<'de> for IpcSender<T> {
345 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
346 where
347 D: Deserializer<'de>,
348 {
349 let os_sender = deserialize_os_ipc_sender(deserializer)?;
350 Ok(IpcSender {
351 os_sender,
352 phantom: PhantomData,
353 })
354 }
355}
356
357impl<T> Serialize for IpcSender<T> {
358 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
359 where
360 S: Serializer,
361 {
362 serialize_os_ipc_sender(&self.os_sender, serializer)
363 }
364}
365
366pub struct IpcReceiverSet {
401 os_receiver_set: OsIpcReceiverSet,
402}
403
404impl IpcReceiverSet {
405 pub fn new() -> Result<IpcReceiverSet, io::Error> {
413 Ok(IpcReceiverSet {
414 os_receiver_set: OsIpcReceiverSet::new()?,
415 })
416 }
417
418 pub fn add<T>(&mut self, receiver: IpcReceiver<T>) -> Result<u64, io::Error>
421 where
422 T: for<'de> Deserialize<'de> + Serialize,
423 {
424 Ok(self.os_receiver_set.add(receiver.os_receiver)?)
425 }
426
427 pub fn add_opaque(&mut self, receiver: OpaqueIpcReceiver) -> Result<u64, io::Error> {
430 Ok(self.os_receiver_set.add(receiver.os_receiver)?)
431 }
432
433 pub fn select(&mut self) -> Result<Vec<IpcSelectionResult>, io::Error> {
439 let results = self.os_receiver_set.select()?;
440 Ok(results
441 .into_iter()
442 .map(|result| match result {
443 OsIpcSelectionResult::DataReceived(os_receiver_id, ipc_message) => {
444 IpcSelectionResult::MessageReceived(os_receiver_id, ipc_message)
445 },
446 OsIpcSelectionResult::ChannelClosed(os_receiver_id) => {
447 IpcSelectionResult::ChannelClosed(os_receiver_id)
448 },
449 })
450 .collect())
451 }
452
453 pub fn try_select(&mut self) -> Result<Vec<IpcSelectionResult>, TrySelectError> {
462 let results: Vec<OsIpcSelectionResult> =
463 self.os_receiver_set.try_select().map_err(|e| match e {
464 OsTrySelectError::IoError(e) => TrySelectError::IoError(e.into()),
465 OsTrySelectError::Empty => TrySelectError::Empty,
466 })?;
467 let results = results
468 .into_iter()
469 .map(|result| match result {
470 OsIpcSelectionResult::DataReceived(os_receiver_id, ipc_message) => {
471 IpcSelectionResult::MessageReceived(os_receiver_id, ipc_message)
472 },
473 OsIpcSelectionResult::ChannelClosed(os_receiver_id) => {
474 IpcSelectionResult::ChannelClosed(os_receiver_id)
475 },
476 })
477 .collect::<Vec<IpcSelectionResult>>();
478 Ok(results)
479 }
480
481 pub fn try_select_timeout(
498 &mut self,
499 duration: Duration,
500 ) -> Result<Vec<IpcSelectionResult>, TrySelectError> {
501 let results = self
502 .os_receiver_set
503 .try_select_timeout(duration)
504 .map_err(|e| match e {
505 OsTrySelectError::IoError(e) => TrySelectError::IoError(e.into()),
506 OsTrySelectError::Empty => TrySelectError::Empty,
507 })?;
508
509 let results = results
510 .into_iter()
511 .map(|result| match result {
512 OsIpcSelectionResult::DataReceived(os_receiver_id, ipc_message) => {
513 IpcSelectionResult::MessageReceived(os_receiver_id, ipc_message)
514 },
515 OsIpcSelectionResult::ChannelClosed(os_receiver_id) => {
516 IpcSelectionResult::ChannelClosed(os_receiver_id)
517 },
518 })
519 .collect::<Vec<IpcSelectionResult>>();
520 if results.is_empty() {
521 Err(TrySelectError::Empty)
522 } else {
523 Ok(results)
524 }
525 }
526}
527
528#[derive(Clone, Debug, PartialEq)]
542pub struct IpcSharedMemory {
543 os_shared_memory: Option<OsIpcSharedMemory>,
545}
546
547impl Deref for IpcSharedMemory {
548 type Target = [u8];
549
550 #[inline]
551 fn deref(&self) -> &[u8] {
552 if let Some(os_shared_memory) = &self.os_shared_memory {
553 os_shared_memory
554 } else {
555 &[]
556 }
557 }
558}
559
560impl IpcSharedMemory {
561 #[inline]
569 pub unsafe fn deref_mut(&mut self) -> &mut [u8] {
570 if let Some(os_shared_memory) = &mut self.os_shared_memory {
571 os_shared_memory.deref_mut()
572 } else {
573 &mut []
574 }
575 }
576}
577
578impl<'de> Deserialize<'de> for IpcSharedMemory {
579 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
580 where
581 D: Deserializer<'de>,
582 {
583 let index: usize = Deserialize::deserialize(deserializer)?;
584 if index == usize::MAX {
585 return Ok(IpcSharedMemory::empty());
586 }
587
588 let os_shared_memory = OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with(
589 |os_ipc_shared_memory_regions_for_deserialization| {
590 let mut regions = os_ipc_shared_memory_regions_for_deserialization.borrow_mut();
591 let Some(region) = regions.get_mut(index) else {
592 return Err(format!("Cannot consume shared memory region at index {index}, there are only {} regions available", regions.len()));
593 };
594
595 region.take().ok_or_else(|| format!("Shared memory region {index} has already been consumed"))
596 },
597 ).map_err(D::Error::custom)?;
598
599 Ok(IpcSharedMemory {
600 os_shared_memory: Some(os_shared_memory),
601 })
602 }
603}
604
605impl Serialize for IpcSharedMemory {
606 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
607 where
608 S: Serializer,
609 {
610 if let Some(os_shared_memory) = &self.os_shared_memory {
611 let index = OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with(
612 |os_ipc_shared_memory_regions_for_serialization| {
613 let mut os_ipc_shared_memory_regions_for_serialization =
614 os_ipc_shared_memory_regions_for_serialization.borrow_mut();
615 let index = os_ipc_shared_memory_regions_for_serialization.len();
616 os_ipc_shared_memory_regions_for_serialization.push(os_shared_memory.clone());
617 index
618 },
619 );
620 debug_assert!(index < usize::MAX);
621 index
622 } else {
623 usize::MAX
624 }
625 .serialize(serializer)
626 }
627}
628
629impl IpcSharedMemory {
630 const fn empty() -> Self {
631 Self {
632 os_shared_memory: None,
633 }
634 }
635
636 pub fn from_bytes(bytes: &[u8]) -> IpcSharedMemory {
638 if bytes.is_empty() {
639 IpcSharedMemory::empty()
640 } else {
641 IpcSharedMemory {
642 os_shared_memory: Some(OsIpcSharedMemory::from_bytes(bytes)),
643 }
644 }
645 }
646
647 pub fn from_byte(byte: u8, length: usize) -> IpcSharedMemory {
650 if length == 0 {
651 IpcSharedMemory::empty()
652 } else {
653 IpcSharedMemory {
654 os_shared_memory: Some(OsIpcSharedMemory::from_byte(byte, length)),
655 }
656 }
657 }
658
659 pub fn take(mut self) -> Option<Vec<u8>> {
663 if let Some(os_shared_memory) = self.os_shared_memory.take() {
664 os_shared_memory.take()
665 } else {
666 Some(vec![])
668 }
669 }
670}
671
672#[derive(Debug)]
676pub enum IpcSelectionResult {
677 MessageReceived(u64, IpcMessage),
680 ChannelClosed(u64),
683}
684
685impl IpcSelectionResult {
686 pub fn unwrap(self) -> (u64, IpcMessage) {
697 match self {
698 IpcSelectionResult::MessageReceived(id, message) => (id, message),
699 IpcSelectionResult::ChannelClosed(id) => {
700 panic!("IpcSelectionResult::unwrap(): channel {id} closed")
701 },
702 }
703 }
704}
705
706pub struct IpcMessage {
712 pub(crate) data: Vec<u8>,
713 pub(crate) os_ipc_channels: Vec<OsOpaqueIpcChannel>,
714 pub(crate) os_ipc_shared_memory_regions: Vec<OsIpcSharedMemory>,
715}
716
717impl IpcMessage {
718 pub fn from_data(data: Vec<u8>) -> Self {
721 Self {
722 data,
723 os_ipc_channels: vec![],
724 os_ipc_shared_memory_regions: vec![],
725 }
726 }
727}
728
729impl Debug for IpcMessage {
730 fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
731 match String::from_utf8(self.data.clone()) {
732 Ok(string) => string.chars().take(256).collect::<String>().fmt(formatter),
733 Err(..) => self.data[0..min(self.data.len(), 256)].fmt(formatter),
734 }
735 }
736}
737
738impl IpcMessage {
739 pub(crate) fn new(
740 data: Vec<u8>,
741 os_ipc_channels: Vec<OsOpaqueIpcChannel>,
742 os_ipc_shared_memory_regions: Vec<OsIpcSharedMemory>,
743 ) -> IpcMessage {
744 IpcMessage {
745 data,
746 os_ipc_channels,
747 os_ipc_shared_memory_regions,
748 }
749 }
750
751 pub fn to<T>(self) -> Result<T, SerDeError>
753 where
754 T: for<'de> Deserialize<'de> + Serialize,
755 {
756 OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
757 OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with(
758 |os_ipc_shared_memory_regions_for_deserialization| {
759 *os_ipc_channels_for_deserialization.borrow_mut() = self.os_ipc_channels;
761 *os_ipc_shared_memory_regions_for_deserialization.borrow_mut() = self
762 .os_ipc_shared_memory_regions
763 .into_iter()
764 .map(Some)
765 .collect();
766
767 let result = postcard::from_bytes(&self.data).map_err(|e| e.into());
768
769 let _ = os_ipc_shared_memory_regions_for_deserialization.take();
771 let _ = os_ipc_channels_for_deserialization.take();
772
773 result
776 },
777 )
778 })
779 }
780}
781
782#[derive(Clone, Debug)]
783pub struct OpaqueIpcSender {
784 os_sender: OsIpcSender,
785}
786
787impl OpaqueIpcSender {
788 pub fn to<'de, T>(self) -> IpcSender<T>
789 where
790 T: Deserialize<'de> + Serialize,
791 {
792 IpcSender {
793 os_sender: self.os_sender,
794 phantom: PhantomData,
795 }
796 }
797}
798
799impl<'de> Deserialize<'de> for OpaqueIpcSender {
800 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
801 where
802 D: Deserializer<'de>,
803 {
804 let os_sender = deserialize_os_ipc_sender(deserializer)?;
805 Ok(OpaqueIpcSender { os_sender })
806 }
807}
808
809impl Serialize for OpaqueIpcSender {
810 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
811 where
812 S: Serializer,
813 {
814 serialize_os_ipc_sender(&self.os_sender, serializer)
815 }
816}
817
818#[derive(Debug)]
819pub struct OpaqueIpcReceiver {
820 os_receiver: OsIpcReceiver,
821}
822
823impl OpaqueIpcReceiver {
824 pub fn to<'de, T>(self) -> IpcReceiver<T>
825 where
826 T: Deserialize<'de> + Serialize,
827 {
828 IpcReceiver {
829 os_receiver: self.os_receiver,
830 phantom: PhantomData,
831 }
832 }
833}
834
835impl<'de> Deserialize<'de> for OpaqueIpcReceiver {
836 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
837 where
838 D: Deserializer<'de>,
839 {
840 let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
841 Ok(OpaqueIpcReceiver { os_receiver })
842 }
843}
844
845impl Serialize for OpaqueIpcReceiver {
846 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
847 where
848 S: Serializer,
849 {
850 serialize_os_ipc_receiver(&self.os_receiver, serializer)
851 }
852}
853
854pub struct IpcOneShotServer<T> {
889 os_server: OsIpcOneShotServer,
890 phantom: PhantomData<T>,
891}
892
893impl<T> IpcOneShotServer<T>
894where
895 T: for<'de> Deserialize<'de> + Serialize,
896{
897 pub fn new() -> Result<(IpcOneShotServer<T>, String), io::Error> {
898 let (os_server, name) = OsIpcOneShotServer::new()?;
899 Ok((
900 IpcOneShotServer {
901 os_server,
902 phantom: PhantomData,
903 },
904 name,
905 ))
906 }
907
908 pub fn accept(self) -> Result<(IpcReceiver<T>, T), IpcError> {
909 let (os_receiver, ipc_message) = self.os_server.accept()?;
910 Ok((
911 IpcReceiver {
912 os_receiver,
913 phantom: PhantomData,
914 },
915 ipc_message.to()?,
916 ))
917 }
918}
919
920#[derive(Debug)]
922pub struct IpcBytesReceiver {
923 os_receiver: OsIpcReceiver,
924}
925
926impl IpcBytesReceiver {
927 #[inline]
929 pub fn recv(&self) -> Result<Vec<u8>, IpcError> {
930 match self.os_receiver.recv() {
931 Ok(ipc_message) => Ok(ipc_message.data),
932 Err(err) => Err(err.into()),
933 }
934 }
935
936 pub fn try_recv(&self) -> Result<Vec<u8>, TryRecvError> {
938 match self.os_receiver.try_recv() {
939 Ok(ipc_message) => Ok(ipc_message.data),
940 Err(err) => Err(err.into()),
941 }
942 }
943}
944
945impl<'de> Deserialize<'de> for IpcBytesReceiver {
946 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
947 where
948 D: Deserializer<'de>,
949 {
950 let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
951 Ok(IpcBytesReceiver { os_receiver })
952 }
953}
954
955impl Serialize for IpcBytesReceiver {
956 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
957 where
958 S: Serializer,
959 {
960 serialize_os_ipc_receiver(&self.os_receiver, serializer)
961 }
962}
963
964#[derive(Debug)]
966pub struct IpcBytesSender {
967 os_sender: OsIpcSender,
968}
969
970impl Clone for IpcBytesSender {
971 fn clone(&self) -> IpcBytesSender {
972 IpcBytesSender {
973 os_sender: self.os_sender.clone(),
974 }
975 }
976}
977
978impl<'de> Deserialize<'de> for IpcBytesSender {
979 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
980 where
981 D: Deserializer<'de>,
982 {
983 let os_sender = deserialize_os_ipc_sender(deserializer)?;
984 Ok(IpcBytesSender { os_sender })
985 }
986}
987
988impl Serialize for IpcBytesSender {
989 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
990 where
991 S: Serializer,
992 {
993 serialize_os_ipc_sender(&self.os_sender, serializer)
994 }
995}
996
997impl IpcBytesSender {
998 #[inline]
999 pub fn send(&self, data: &[u8]) -> Result<(), io::Error> {
1000 self.os_sender
1001 .send(data, vec![], vec![])
1002 .map_err(io::Error::from)
1003 }
1004}
1005
1006fn serialize_os_ipc_sender<S>(os_ipc_sender: &OsIpcSender, serializer: S) -> Result<S::Ok, S::Error>
1007where
1008 S: Serializer,
1009{
1010 let index = OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
1011 let mut os_ipc_channels_for_serialization = os_ipc_channels_for_serialization.borrow_mut();
1012 let index = os_ipc_channels_for_serialization.len();
1013 os_ipc_channels_for_serialization.push(OsIpcChannel::Sender(os_ipc_sender.clone()));
1014 index
1015 });
1016 index.serialize(serializer)
1017}
1018
1019fn deserialize_os_ipc_sender<'de, D>(deserializer: D) -> Result<OsIpcSender, D::Error>
1020where
1021 D: Deserializer<'de>,
1022{
1023 let index: usize = Deserialize::deserialize(deserializer)?;
1024 OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
1025 Ok(os_ipc_channels_for_deserialization.borrow_mut()[index].to_sender())
1028 })
1029}
1030
1031fn serialize_os_ipc_receiver<S>(
1032 os_receiver: &OsIpcReceiver,
1033 serializer: S,
1034) -> Result<S::Ok, S::Error>
1035where
1036 S: Serializer,
1037{
1038 let index = OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
1039 let mut os_ipc_channels_for_serialization = os_ipc_channels_for_serialization.borrow_mut();
1040 let index = os_ipc_channels_for_serialization.len();
1041 os_ipc_channels_for_serialization.push(OsIpcChannel::Receiver(os_receiver.consume()));
1042 index
1043 });
1044 index.serialize(serializer)
1045}
1046
1047fn deserialize_os_ipc_receiver<'de, D>(deserializer: D) -> Result<OsIpcReceiver, D::Error>
1048where
1049 D: Deserializer<'de>,
1050{
1051 let index: usize = Deserialize::deserialize(deserializer)?;
1052
1053 OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
1054 Ok(os_ipc_channels_for_deserialization.borrow_mut()[index].to_receiver())
1057 })
1058}