ipc_channel/
ipc.rs

1// Copyright 2015 The Servo Project Developers. See the COPYRIGHT
2// file at the top-level directory of this distribution.
3//
4// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
5// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
7// option. This file may not be copied, modified, or distributed
8// except according to those terms.
9
10use crate::platform::{self, OsIpcChannel, OsIpcReceiver, OsIpcReceiverSet, OsIpcSender};
11use crate::platform::{
12    OsIpcOneShotServer, OsIpcSelectionResult, OsIpcSharedMemory, OsOpaqueIpcChannel,
13};
14
15use bincode;
16use serde::{Deserialize, Deserializer, Serialize, Serializer};
17use std::cell::RefCell;
18use std::cmp::min;
19use std::error::Error as StdError;
20use std::fmt::{self, Debug, Formatter};
21use std::io;
22use std::marker::PhantomData;
23use std::mem;
24use std::ops::Deref;
25use std::time::Duration;
26
27thread_local! {
28    static OS_IPC_CHANNELS_FOR_DESERIALIZATION: RefCell<Vec<OsOpaqueIpcChannel>> =
29        const { RefCell::new(Vec::new()) }
30}
31thread_local! {
32    static OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION:
33        RefCell<Vec<Option<OsIpcSharedMemory>>> = const { RefCell::new(Vec::new()) }
34}
35thread_local! {
36    static OS_IPC_CHANNELS_FOR_SERIALIZATION: RefCell<Vec<OsIpcChannel>> = const { RefCell::new(Vec::new()) }
37}
38thread_local! {
39    static OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION: RefCell<Vec<OsIpcSharedMemory>> =
40        const { RefCell::new(Vec::new()) }
41}
42
43#[derive(Debug)]
44pub enum IpcError {
45    Bincode(bincode::Error),
46    Io(io::Error),
47    Disconnected,
48}
49
50impl fmt::Display for IpcError {
51    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
52        match *self {
53            IpcError::Bincode(ref err) => write!(fmt, "bincode error: {}", err),
54            IpcError::Io(ref err) => write!(fmt, "io error: {}", err),
55            IpcError::Disconnected => write!(fmt, "disconnected"),
56        }
57    }
58}
59
60impl StdError for IpcError {
61    fn source(&self) -> Option<&(dyn StdError + 'static)> {
62        match *self {
63            IpcError::Bincode(ref err) => Some(err),
64            IpcError::Io(ref err) => Some(err),
65            IpcError::Disconnected => None,
66        }
67    }
68}
69
70#[derive(Debug)]
71pub enum TryRecvError {
72    IpcError(IpcError),
73    Empty,
74}
75
76impl fmt::Display for TryRecvError {
77    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
78        match *self {
79            TryRecvError::IpcError(ref err) => write!(fmt, "ipc error: {}", err),
80            TryRecvError::Empty => write!(fmt, "empty"),
81        }
82    }
83}
84
85impl StdError for TryRecvError {
86    fn source(&self) -> Option<&(dyn StdError + 'static)> {
87        match *self {
88            TryRecvError::IpcError(ref err) => Some(err),
89            TryRecvError::Empty => None,
90        }
91    }
92}
93
94/// Create a connected [IpcSender] and [IpcReceiver] that
95/// transfer messages of a given type provided by type `T`
96/// or inferred by the types of messages sent by the sender.
97///
98/// Messages sent by the sender will be available to the
99/// receiver even if the sender or receiver has been moved
100/// to a different process. In addition, receivers and senders
101/// may be sent over an existing channel.
102///
103/// # Examples
104///
105/// ```
106/// # use ipc_channel::ipc;
107///
108/// let payload = "Hello, World!".to_owned();
109///
110/// // Create a channel
111/// let (tx, rx) = ipc::channel().unwrap();
112///
113/// // Send data
114/// tx.send(payload).unwrap();
115///
116/// // Receive the data
117/// let response = rx.recv().unwrap();
118///
119/// assert_eq!(response, "Hello, World!".to_owned());
120/// ```
121///
122/// [IpcSender]: struct.IpcSender.html
123/// [IpcReceiver]: struct.IpcReceiver.html
124pub fn channel<T>() -> Result<(IpcSender<T>, IpcReceiver<T>), io::Error>
125where
126    T: for<'de> Deserialize<'de> + Serialize,
127{
128    let (os_sender, os_receiver) = platform::channel()?;
129    let ipc_receiver = IpcReceiver {
130        os_receiver,
131        phantom: PhantomData,
132    };
133    let ipc_sender = IpcSender {
134        os_sender,
135        phantom: PhantomData,
136    };
137    Ok((ipc_sender, ipc_receiver))
138}
139
140/// Create a connected [IpcBytesSender] and [IpcBytesReceiver].
141///
142/// Note: The [IpcBytesSender] transfers messages of the type `[u8]`
143/// and the [IpcBytesReceiver] receives a `Vec<u8>`. This sender/receiver
144/// type does not serialize/deserialize messages through `serde`, making
145/// it more efficient where applicable.
146///
147/// # Examples
148///
149/// ```
150/// # use ipc_channel::ipc;
151///
152/// let payload = b"'Tis but a scratch!!";
153///
154/// // Create a channel
155/// let (tx, rx) = ipc::bytes_channel().unwrap();
156///
157/// // Send data
158/// tx.send(payload).unwrap();
159///
160/// // Receive the data
161/// let response = rx.recv().unwrap();
162///
163/// assert_eq!(response, payload);
164/// ```
165///
166/// [IpcBytesReceiver]: struct.IpcBytesReceiver.html
167/// [IpcBytesSender]: struct.IpcBytesSender.html
168pub fn bytes_channel() -> Result<(IpcBytesSender, IpcBytesReceiver), io::Error> {
169    let (os_sender, os_receiver) = platform::channel()?;
170    let ipc_bytes_receiver = IpcBytesReceiver { os_receiver };
171    let ipc_bytes_sender = IpcBytesSender { os_sender };
172    Ok((ipc_bytes_sender, ipc_bytes_receiver))
173}
174
175/// Receiving end of a channel using serialized messages.
176///
177/// # Examples
178///
179/// ## Blocking IO
180///
181/// ```
182/// # use ipc_channel::ipc;
183/// #
184/// # let (tx, rx) = ipc::channel().unwrap();
185/// #
186/// # let q = "Answer to the ultimate question of life, the universe, and everything";
187/// #
188/// # tx.send(q.to_owned()).unwrap();
189/// let response = rx.recv().unwrap();
190/// println!("Received data...");
191/// # assert_eq!(response, q);
192/// ```
193///
194/// ## Non-blocking IO
195///
196/// ```
197/// # use ipc_channel::ipc;
198/// #
199/// # let (tx, rx) = ipc::channel().unwrap();
200/// #
201/// # let answer = "42";
202/// #
203/// # tx.send(answer.to_owned()).unwrap();
204/// loop {
205///     match rx.try_recv() {
206///         Ok(res) => {
207///             // Do something interesting with your result
208///             println!("Received data...");
209///             break;
210///         },
211///         Err(_) => {
212///             // Do something else useful while we wait
213///             println!("Still waiting...");
214///         }
215///     }
216/// }
217/// ```
218///
219/// ## Embedding Receivers
220///
221/// ```
222/// # use ipc_channel::ipc;
223/// #
224/// let (tx, rx) = ipc::channel().unwrap();
225/// let (embedded_tx, embedded_rx) = ipc::channel().unwrap();
226/// # let data = [0x45, 0x6d, 0x62, 0x65, 0x64, 0x64, 0x65, 0x64, 0x00];
227/// // Send the IpcReceiver
228/// tx.send(embedded_rx).unwrap();
229/// # embedded_tx.send(data.to_owned()).unwrap();
230/// // Receive the sent IpcReceiver
231/// let received_rx = rx.recv().unwrap();
232/// // Receive any data sent to the received IpcReceiver
233/// let rx_data = received_rx.recv().unwrap();
234/// # assert_eq!(rx_data, data);
235/// ```
236///
237/// # Implementation details
238///
239/// Each [IpcReceiver] is backed by the OS specific implementations of `OsIpcReceiver`.
240///
241/// [IpcReceiver]: struct.IpcReceiver.html
242#[derive(Debug)]
243pub struct IpcReceiver<T> {
244    os_receiver: OsIpcReceiver,
245    phantom: PhantomData<T>,
246}
247
248impl<T> IpcReceiver<T>
249where
250    T: for<'de> Deserialize<'de> + Serialize,
251{
252    /// Blocking receive.
253    pub fn recv(&self) -> Result<T, IpcError> {
254        self.os_receiver.recv()?.to().map_err(IpcError::Bincode)
255    }
256
257    /// Non-blocking receive
258    pub fn try_recv(&self) -> Result<T, TryRecvError> {
259        self.os_receiver
260            .try_recv()?
261            .to()
262            .map_err(IpcError::Bincode)
263            .map_err(TryRecvError::IpcError)
264    }
265
266    /// Blocks for up to the specified duration attempting to receive a message.
267    ///
268    /// This may block for longer than the specified duration if the channel is busy. If your timeout
269    /// exceeds the duration that your operating system can represent in milliseconds, this may
270    /// block forever. At the time of writing, the smallest duration that may trigger this behavior
271    /// is over 24 days.
272    pub fn try_recv_timeout(&self, duration: Duration) -> Result<T, TryRecvError> {
273        self.os_receiver
274            .try_recv_timeout(duration)?
275            .to()
276            .map_err(IpcError::Bincode)
277            .map_err(TryRecvError::IpcError)
278    }
279
280    /// Erase the type of the channel.
281    ///
282    /// Useful for adding routes to a `RouterProxy`.
283    pub fn to_opaque(self) -> OpaqueIpcReceiver {
284        OpaqueIpcReceiver {
285            os_receiver: self.os_receiver,
286        }
287    }
288}
289
290impl<'de, T> Deserialize<'de> for IpcReceiver<T> {
291    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
292    where
293        D: Deserializer<'de>,
294    {
295        let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
296        Ok(IpcReceiver {
297            os_receiver,
298            phantom: PhantomData,
299        })
300    }
301}
302
303impl<T> Serialize for IpcReceiver<T> {
304    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
305    where
306        S: Serializer,
307    {
308        serialize_os_ipc_receiver(&self.os_receiver, serializer)
309    }
310}
311
312/// Sending end of a channel using serialized messages.
313///
314///
315/// ## Embedding Senders
316///
317/// ```
318/// # use ipc_channel::ipc;
319/// #
320/// # let (tx, rx) = ipc::channel().unwrap();
321/// # let (embedded_tx, embedded_rx) = ipc::channel().unwrap();
322/// # let data = [0x45, 0x6d, 0x62, 0x65, 0x64, 0x64, 0x65, 0x64, 0x00];
323/// // Send the IpcSender
324/// tx.send(embedded_tx).unwrap();
325/// // Receive the sent IpcSender
326/// let received_tx = rx.recv().unwrap();
327/// // Send data from the received IpcSender
328/// received_tx.send(data.clone()).unwrap();
329/// # let rx_data = embedded_rx.recv().unwrap();
330/// # assert_eq!(rx_data, data);
331/// ```
332#[derive(Debug)]
333pub struct IpcSender<T> {
334    os_sender: OsIpcSender,
335    phantom: PhantomData<T>,
336}
337
338impl<T> Clone for IpcSender<T>
339where
340    T: Serialize,
341{
342    fn clone(&self) -> IpcSender<T> {
343        IpcSender {
344            os_sender: self.os_sender.clone(),
345            phantom: PhantomData,
346        }
347    }
348}
349
350impl<T> IpcSender<T>
351where
352    T: Serialize,
353{
354    /// Create an [IpcSender] connected to a previously defined [IpcOneShotServer].
355    ///
356    /// [IpcSender]: struct.IpcSender.html
357    /// [IpcOneShotServer]: struct.IpcOneShotServer.html
358    pub fn connect(name: String) -> Result<IpcSender<T>, io::Error> {
359        Ok(IpcSender {
360            os_sender: OsIpcSender::connect(name)?,
361            phantom: PhantomData,
362        })
363    }
364
365    /// Send data across the channel to the receiver.
366    pub fn send(&self, data: T) -> Result<(), bincode::Error> {
367        let mut bytes = Vec::with_capacity(4096);
368        OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
369            OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with(
370                |os_ipc_shared_memory_regions_for_serialization| {
371                    let old_os_ipc_channels =
372                        mem::take(&mut *os_ipc_channels_for_serialization.borrow_mut());
373                    let old_os_ipc_shared_memory_regions = mem::take(
374                        &mut *os_ipc_shared_memory_regions_for_serialization.borrow_mut(),
375                    );
376                    let os_ipc_shared_memory_regions;
377                    let os_ipc_channels;
378                    {
379                        bincode::serialize_into(&mut bytes, &data)?;
380                        os_ipc_channels = mem::replace(
381                            &mut *os_ipc_channels_for_serialization.borrow_mut(),
382                            old_os_ipc_channels,
383                        );
384                        os_ipc_shared_memory_regions = mem::replace(
385                            &mut *os_ipc_shared_memory_regions_for_serialization.borrow_mut(),
386                            old_os_ipc_shared_memory_regions,
387                        );
388                    };
389                    Ok(self.os_sender.send(
390                        &bytes[..],
391                        os_ipc_channels,
392                        os_ipc_shared_memory_regions,
393                    )?)
394                },
395            )
396        })
397    }
398
399    pub fn to_opaque(self) -> OpaqueIpcSender {
400        OpaqueIpcSender {
401            os_sender: self.os_sender,
402        }
403    }
404}
405
406impl<'de, T> Deserialize<'de> for IpcSender<T> {
407    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
408    where
409        D: Deserializer<'de>,
410    {
411        let os_sender = deserialize_os_ipc_sender(deserializer)?;
412        Ok(IpcSender {
413            os_sender,
414            phantom: PhantomData,
415        })
416    }
417}
418
419impl<T> Serialize for IpcSender<T> {
420    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
421    where
422        S: Serializer,
423    {
424        serialize_os_ipc_sender(&self.os_sender, serializer)
425    }
426}
427
428/// Collection of [IpcReceiver]s moved into the set; thus creating a common
429/// (and exclusive) endpoint for receiving messages on any of the added
430/// channels.
431///
432/// # Examples
433///
434/// ```
435/// # use ipc_channel::ipc::{self, IpcReceiverSet, IpcSelectionResult};
436/// let data = vec![0x52, 0x75, 0x73, 0x74, 0x00];
437/// let (tx, rx) = ipc::channel().unwrap();
438/// let mut rx_set = IpcReceiverSet::new().unwrap();
439///
440/// // Add the receiver to the receiver set and send the data
441/// // from the sender
442/// let rx_id = rx_set.add(rx).unwrap();
443/// tx.send(data.clone()).unwrap();
444///
445/// // Poll the receiver set for any readable events
446/// for event in rx_set.select().unwrap() {
447///     match event {
448///         IpcSelectionResult::MessageReceived(id, message) => {
449///             let rx_data: Vec<u8> = message.to().unwrap();
450///             assert_eq!(id, rx_id);
451///             assert_eq!(data, rx_data);
452///             println!("Received: {:?} from {}...", data, id);
453///         },
454///         IpcSelectionResult::ChannelClosed(id) => {
455///             assert_eq!(id, rx_id);
456///             println!("No more data from {}...", id);
457///         }
458///     }
459/// }
460/// ```
461/// [IpcReceiver]: struct.IpcReceiver.html
462pub struct IpcReceiverSet {
463    os_receiver_set: OsIpcReceiverSet,
464}
465
466impl IpcReceiverSet {
467    /// Create a new empty [IpcReceiverSet].
468    ///
469    /// Receivers may then be added to the set with the [add]
470    /// method.
471    ///
472    /// [add]: #method.add
473    /// [IpcReceiverSet]: struct.IpcReceiverSet.html
474    pub fn new() -> Result<IpcReceiverSet, io::Error> {
475        Ok(IpcReceiverSet {
476            os_receiver_set: OsIpcReceiverSet::new()?,
477        })
478    }
479
480    /// Add and consume the [IpcReceiver] to the set of receivers to be polled.
481    /// [IpcReceiver]: struct.IpcReceiver.html
482    pub fn add<T>(&mut self, receiver: IpcReceiver<T>) -> Result<u64, io::Error>
483    where
484        T: for<'de> Deserialize<'de> + Serialize,
485    {
486        Ok(self.os_receiver_set.add(receiver.os_receiver)?)
487    }
488
489    /// Add an [OpaqueIpcReceiver] to the set of receivers to be polled.
490    /// [OpaqueIpcReceiver]: struct.OpaqueIpcReceiver.html
491    pub fn add_opaque(&mut self, receiver: OpaqueIpcReceiver) -> Result<u64, io::Error> {
492        Ok(self.os_receiver_set.add(receiver.os_receiver)?)
493    }
494
495    /// Wait for IPC messages received on any of the receivers in the set. The
496    /// method will return multiple events. An event may be either a message
497    /// received or a channel closed event.
498    ///
499    /// [IpcReceiver]: struct.IpcReceiver.html
500    pub fn select(&mut self) -> Result<Vec<IpcSelectionResult>, io::Error> {
501        let results = self.os_receiver_set.select()?;
502        Ok(results
503            .into_iter()
504            .map(|result| match result {
505                OsIpcSelectionResult::DataReceived(os_receiver_id, ipc_message) => {
506                    IpcSelectionResult::MessageReceived(os_receiver_id, ipc_message)
507                },
508                OsIpcSelectionResult::ChannelClosed(os_receiver_id) => {
509                    IpcSelectionResult::ChannelClosed(os_receiver_id)
510                },
511            })
512            .collect())
513    }
514}
515
516/// Shared memory descriptor that will be made accessible to the receiver
517/// of an IPC message that contains the discriptor.
518///
519/// # Examples
520/// ```
521/// # use ipc_channel::ipc::{self, IpcSharedMemory};
522/// # let (tx, rx) = ipc::channel().unwrap();
523/// # let data = [0x76, 0x69, 0x6d, 0x00];
524/// let shmem = IpcSharedMemory::from_bytes(&data);
525/// tx.send(shmem.clone()).unwrap();
526/// # let rx_shmem = rx.recv().unwrap();
527/// # assert_eq!(shmem, rx_shmem);
528/// ```
529#[derive(Clone, Debug, PartialEq)]
530pub struct IpcSharedMemory {
531    /// None represents no data (empty slice)
532    os_shared_memory: Option<OsIpcSharedMemory>,
533}
534
535impl Deref for IpcSharedMemory {
536    type Target = [u8];
537
538    #[inline]
539    fn deref(&self) -> &[u8] {
540        if let Some(os_shared_memory) = &self.os_shared_memory {
541            os_shared_memory
542        } else {
543            &[]
544        }
545    }
546}
547
548impl<'de> Deserialize<'de> for IpcSharedMemory {
549    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
550    where
551        D: Deserializer<'de>,
552    {
553        let index: usize = Deserialize::deserialize(deserializer)?;
554        if index == usize::MAX {
555            return Ok(IpcSharedMemory::empty());
556        }
557
558        let os_shared_memory = OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with(
559            |os_ipc_shared_memory_regions_for_deserialization| {
560                // FIXME(pcwalton): This could panic if the data was corrupt and the index was out
561                // of bounds. We should return an `Err` result instead.
562                os_ipc_shared_memory_regions_for_deserialization.borrow_mut()[index]
563                    .take()
564                    .unwrap()
565            },
566        );
567        Ok(IpcSharedMemory {
568            os_shared_memory: Some(os_shared_memory),
569        })
570    }
571}
572
573impl Serialize for IpcSharedMemory {
574    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
575    where
576        S: Serializer,
577    {
578        if let Some(os_shared_memory) = &self.os_shared_memory {
579            let index = OS_IPC_SHARED_MEMORY_REGIONS_FOR_SERIALIZATION.with(
580                |os_ipc_shared_memory_regions_for_serialization| {
581                    let mut os_ipc_shared_memory_regions_for_serialization =
582                        os_ipc_shared_memory_regions_for_serialization.borrow_mut();
583                    let index = os_ipc_shared_memory_regions_for_serialization.len();
584                    os_ipc_shared_memory_regions_for_serialization.push(os_shared_memory.clone());
585                    index
586                },
587            );
588            debug_assert!(index < usize::MAX);
589            index
590        } else {
591            usize::MAX
592        }
593        .serialize(serializer)
594    }
595}
596
597impl IpcSharedMemory {
598    const fn empty() -> Self {
599        Self {
600            os_shared_memory: None,
601        }
602    }
603
604    /// Create shared memory initialized with the bytes provided.
605    pub fn from_bytes(bytes: &[u8]) -> IpcSharedMemory {
606        if bytes.is_empty() {
607            IpcSharedMemory::empty()
608        } else {
609            IpcSharedMemory {
610                os_shared_memory: Some(OsIpcSharedMemory::from_bytes(bytes)),
611            }
612        }
613    }
614
615    /// Create a chunk of shared memory that is filled with the byte
616    /// provided.
617    pub fn from_byte(byte: u8, length: usize) -> IpcSharedMemory {
618        if length == 0 {
619            IpcSharedMemory::empty()
620        } else {
621            IpcSharedMemory {
622                os_shared_memory: Some(OsIpcSharedMemory::from_byte(byte, length)),
623            }
624        }
625    }
626}
627
628/// Result for readable events returned from [IpcReceiverSet::select].
629///
630/// [IpcReceiverSet::select]: struct.IpcReceiverSet.html#method.select
631pub enum IpcSelectionResult {
632    /// A message received from the [`IpcReceiver`] in the [`IpcMessage`] form,
633    /// identified by the `u64` value.
634    MessageReceived(u64, IpcMessage),
635    /// The channel has been closed for the [IpcReceiver] identified by the `u64` value.
636    /// [IpcReceiver]: struct.IpcReceiver.html
637    ChannelClosed(u64),
638}
639
640impl IpcSelectionResult {
641    /// Helper method to move the value out of the [IpcSelectionResult] if it
642    /// is [MessageReceived].
643    ///
644    /// # Panics
645    ///
646    /// If the result is [ChannelClosed] this call will panic.
647    ///
648    /// [IpcSelectionResult]: enum.IpcSelectionResult.html
649    /// [MessageReceived]: enum.IpcSelectionResult.html#variant.MessageReceived
650    /// [ChannelClosed]: enum.IpcSelectionResult.html#variant.ChannelClosed
651    pub fn unwrap(self) -> (u64, IpcMessage) {
652        match self {
653            IpcSelectionResult::MessageReceived(id, message) => (id, message),
654            IpcSelectionResult::ChannelClosed(id) => {
655                panic!("IpcSelectionResult::unwrap(): channel {} closed", id)
656            },
657        }
658    }
659}
660
661/// Structure used to represent a raw message from an [`IpcSender`].
662///
663/// Use the [to] method to deserialize the raw result into the requested type.
664///
665/// [to]: #method.to
666#[derive(PartialEq)]
667pub struct IpcMessage {
668    pub(crate) data: Vec<u8>,
669    pub(crate) os_ipc_channels: Vec<OsOpaqueIpcChannel>,
670    pub(crate) os_ipc_shared_memory_regions: Vec<OsIpcSharedMemory>,
671}
672
673impl IpcMessage {
674    /// Create a new [`IpcMessage`] with data and without any [`OsOpaqueIpcChannel`]s and
675    /// [`OsIpcSharedMemory`] regions.
676    pub fn from_data(data: Vec<u8>) -> Self {
677        Self {
678            data,
679            os_ipc_channels: vec![],
680            os_ipc_shared_memory_regions: vec![],
681        }
682    }
683}
684
685impl Debug for IpcMessage {
686    fn fmt(&self, formatter: &mut Formatter) -> Result<(), fmt::Error> {
687        match String::from_utf8(self.data.clone()) {
688            Ok(string) => string.chars().take(256).collect::<String>().fmt(formatter),
689            Err(..) => self.data[0..min(self.data.len(), 256)].fmt(formatter),
690        }
691    }
692}
693
694impl IpcMessage {
695    pub(crate) fn new(
696        data: Vec<u8>,
697        os_ipc_channels: Vec<OsOpaqueIpcChannel>,
698        os_ipc_shared_memory_regions: Vec<OsIpcSharedMemory>,
699    ) -> IpcMessage {
700        IpcMessage {
701            data,
702            os_ipc_channels,
703            os_ipc_shared_memory_regions,
704        }
705    }
706
707    /// Deserialize the raw data in the contained message into the inferred type.
708    pub fn to<T>(mut self) -> Result<T, bincode::Error>
709    where
710        T: for<'de> Deserialize<'de> + Serialize,
711    {
712        OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
713            OS_IPC_SHARED_MEMORY_REGIONS_FOR_DESERIALIZATION.with(
714                |os_ipc_shared_memory_regions_for_deserialization| {
715                    mem::swap(
716                        &mut *os_ipc_channels_for_deserialization.borrow_mut(),
717                        &mut self.os_ipc_channels,
718                    );
719                    let old_ipc_shared_memory_regions_for_deserialization = mem::replace(
720                        &mut *os_ipc_shared_memory_regions_for_deserialization.borrow_mut(),
721                        self.os_ipc_shared_memory_regions
722                            .into_iter()
723                            .map(Some)
724                            .collect(),
725                    );
726                    let result = bincode::deserialize(&self.data[..]);
727                    *os_ipc_shared_memory_regions_for_deserialization.borrow_mut() =
728                        old_ipc_shared_memory_regions_for_deserialization;
729                    mem::swap(
730                        &mut *os_ipc_channels_for_deserialization.borrow_mut(),
731                        &mut self.os_ipc_channels,
732                    );
733                    /* Error check comes after doing cleanup,
734                     * since we need the cleanup both in the success and the error cases. */
735                    result
736                },
737            )
738        })
739    }
740}
741
742#[derive(Clone, Debug)]
743pub struct OpaqueIpcSender {
744    os_sender: OsIpcSender,
745}
746
747impl OpaqueIpcSender {
748    pub fn to<'de, T>(self) -> IpcSender<T>
749    where
750        T: Deserialize<'de> + Serialize,
751    {
752        IpcSender {
753            os_sender: self.os_sender,
754            phantom: PhantomData,
755        }
756    }
757}
758
759impl<'de> Deserialize<'de> for OpaqueIpcSender {
760    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
761    where
762        D: Deserializer<'de>,
763    {
764        let os_sender = deserialize_os_ipc_sender(deserializer)?;
765        Ok(OpaqueIpcSender { os_sender })
766    }
767}
768
769impl Serialize for OpaqueIpcSender {
770    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
771    where
772        S: Serializer,
773    {
774        serialize_os_ipc_sender(&self.os_sender, serializer)
775    }
776}
777
778#[derive(Debug)]
779pub struct OpaqueIpcReceiver {
780    os_receiver: OsIpcReceiver,
781}
782
783impl OpaqueIpcReceiver {
784    pub fn to<'de, T>(self) -> IpcReceiver<T>
785    where
786        T: Deserialize<'de> + Serialize,
787    {
788        IpcReceiver {
789            os_receiver: self.os_receiver,
790            phantom: PhantomData,
791        }
792    }
793}
794
795impl<'de> Deserialize<'de> for OpaqueIpcReceiver {
796    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
797    where
798        D: Deserializer<'de>,
799    {
800        let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
801        Ok(OpaqueIpcReceiver { os_receiver })
802    }
803}
804
805impl Serialize for OpaqueIpcReceiver {
806    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
807    where
808        S: Serializer,
809    {
810        serialize_os_ipc_receiver(&self.os_receiver, serializer)
811    }
812}
813
814/// A server associated with a given name.
815///
816/// # Examples
817///
818/// ## Basic Usage
819///
820/// ```
821/// use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender, IpcReceiver};
822///
823/// let (server, server_name) = IpcOneShotServer::new().unwrap();
824/// let tx: IpcSender<Vec<u8>> = IpcSender::connect(server_name).unwrap();
825///
826/// tx.send(vec![0x10, 0x11, 0x12, 0x13]).unwrap();
827/// let (_, data): (_, Vec<u8>) = server.accept().unwrap();
828/// assert_eq!(data, vec![0x10, 0x11, 0x12, 0x13]);
829/// ```
830///
831/// ## Sending an [IpcSender]
832/// ```
833/// use ipc_channel::ipc::{self, IpcOneShotServer, IpcSender, IpcReceiver};
834/// let (server, name) = IpcOneShotServer::new().unwrap();
835///
836/// let (tx1, rx1): (IpcSender<Vec<u8>>, IpcReceiver<Vec<u8>>) = ipc::channel().unwrap();
837/// let tx0 = IpcSender::connect(name).unwrap();
838/// tx0.send(tx1).unwrap();
839///
840/// let (_, tx1): (_, IpcSender<Vec<u8>>) = server.accept().unwrap();
841/// tx1.send(vec![0x48, 0x65, 0x6b, 0x6b, 0x6f, 0x00]).unwrap();
842///
843/// let data = rx1.recv().unwrap();
844/// assert_eq!(data, vec![0x48, 0x65, 0x6b, 0x6b, 0x6f, 0x00]);
845/// ```
846/// [IpcSender]: struct.IpcSender.html
847pub struct IpcOneShotServer<T> {
848    os_server: OsIpcOneShotServer,
849    phantom: PhantomData<T>,
850}
851
852impl<T> IpcOneShotServer<T>
853where
854    T: for<'de> Deserialize<'de> + Serialize,
855{
856    pub fn new() -> Result<(IpcOneShotServer<T>, String), io::Error> {
857        let (os_server, name) = OsIpcOneShotServer::new()?;
858        Ok((
859            IpcOneShotServer {
860                os_server,
861                phantom: PhantomData,
862            },
863            name,
864        ))
865    }
866
867    pub fn accept(self) -> Result<(IpcReceiver<T>, T), bincode::Error> {
868        let (os_receiver, ipc_message) = self.os_server.accept()?;
869        Ok((
870            IpcReceiver {
871                os_receiver,
872                phantom: PhantomData,
873            },
874            ipc_message.to()?,
875        ))
876    }
877}
878
879/// Receiving end of a channel that does not used serialized messages.
880#[derive(Debug)]
881pub struct IpcBytesReceiver {
882    os_receiver: OsIpcReceiver,
883}
884
885impl IpcBytesReceiver {
886    /// Blocking receive.
887    #[inline]
888    pub fn recv(&self) -> Result<Vec<u8>, IpcError> {
889        match self.os_receiver.recv() {
890            Ok(ipc_message) => Ok(ipc_message.data),
891            Err(err) => Err(err.into()),
892        }
893    }
894
895    /// Non-blocking receive
896    pub fn try_recv(&self) -> Result<Vec<u8>, TryRecvError> {
897        match self.os_receiver.try_recv() {
898            Ok(ipc_message) => Ok(ipc_message.data),
899            Err(err) => Err(err.into()),
900        }
901    }
902}
903
904impl<'de> Deserialize<'de> for IpcBytesReceiver {
905    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
906    where
907        D: Deserializer<'de>,
908    {
909        let os_receiver = deserialize_os_ipc_receiver(deserializer)?;
910        Ok(IpcBytesReceiver { os_receiver })
911    }
912}
913
914impl Serialize for IpcBytesReceiver {
915    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
916    where
917        S: Serializer,
918    {
919        serialize_os_ipc_receiver(&self.os_receiver, serializer)
920    }
921}
922
923/// Sending end of a channel that does not used serialized messages.
924#[derive(Debug)]
925pub struct IpcBytesSender {
926    os_sender: OsIpcSender,
927}
928
929impl Clone for IpcBytesSender {
930    fn clone(&self) -> IpcBytesSender {
931        IpcBytesSender {
932            os_sender: self.os_sender.clone(),
933        }
934    }
935}
936
937impl<'de> Deserialize<'de> for IpcBytesSender {
938    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
939    where
940        D: Deserializer<'de>,
941    {
942        let os_sender = deserialize_os_ipc_sender(deserializer)?;
943        Ok(IpcBytesSender { os_sender })
944    }
945}
946
947impl Serialize for IpcBytesSender {
948    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
949    where
950        S: Serializer,
951    {
952        serialize_os_ipc_sender(&self.os_sender, serializer)
953    }
954}
955
956impl IpcBytesSender {
957    #[inline]
958    pub fn send(&self, data: &[u8]) -> Result<(), io::Error> {
959        self.os_sender
960            .send(data, vec![], vec![])
961            .map_err(io::Error::from)
962    }
963}
964
965fn serialize_os_ipc_sender<S>(os_ipc_sender: &OsIpcSender, serializer: S) -> Result<S::Ok, S::Error>
966where
967    S: Serializer,
968{
969    let index = OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
970        let mut os_ipc_channels_for_serialization = os_ipc_channels_for_serialization.borrow_mut();
971        let index = os_ipc_channels_for_serialization.len();
972        os_ipc_channels_for_serialization.push(OsIpcChannel::Sender(os_ipc_sender.clone()));
973        index
974    });
975    index.serialize(serializer)
976}
977
978fn deserialize_os_ipc_sender<'de, D>(deserializer: D) -> Result<OsIpcSender, D::Error>
979where
980    D: Deserializer<'de>,
981{
982    let index: usize = Deserialize::deserialize(deserializer)?;
983    OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
984        // FIXME(pcwalton): This could panic if the data was corrupt and the index was out of
985        // bounds. We should return an `Err` result instead.
986        Ok(os_ipc_channels_for_deserialization.borrow_mut()[index].to_sender())
987    })
988}
989
990fn serialize_os_ipc_receiver<S>(
991    os_receiver: &OsIpcReceiver,
992    serializer: S,
993) -> Result<S::Ok, S::Error>
994where
995    S: Serializer,
996{
997    let index = OS_IPC_CHANNELS_FOR_SERIALIZATION.with(|os_ipc_channels_for_serialization| {
998        let mut os_ipc_channels_for_serialization = os_ipc_channels_for_serialization.borrow_mut();
999        let index = os_ipc_channels_for_serialization.len();
1000        os_ipc_channels_for_serialization.push(OsIpcChannel::Receiver(os_receiver.consume()));
1001        index
1002    });
1003    index.serialize(serializer)
1004}
1005
1006fn deserialize_os_ipc_receiver<'de, D>(deserializer: D) -> Result<OsIpcReceiver, D::Error>
1007where
1008    D: Deserializer<'de>,
1009{
1010    let index: usize = Deserialize::deserialize(deserializer)?;
1011
1012    OS_IPC_CHANNELS_FOR_DESERIALIZATION.with(|os_ipc_channels_for_deserialization| {
1013        // FIXME(pcwalton): This could panic if the data was corrupt and the index was out
1014        // of bounds. We should return an `Err` result instead.
1015        Ok(os_ipc_channels_for_deserialization.borrow_mut()[index].to_receiver())
1016    })
1017}