1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
use core::marker::PhantomData;
use std::io::Read;
use crate::error::Result;
use crate::io::{self, Reader};
use crate::packet::*;
use crate::Error;
/// The receiving-half of the channel. This is the same as [`std::sync::mpsc::Receiver`],
/// except for a [few key differences](crate).
///
/// See [crate-level documentation](crate).
pub struct Receiver<'r, T> {
_p: PhantomData<T>,
rx: Reader<'r>,
pbuf: PacketBuf,
pid: PacketId,
}
unsafe impl<T> Send for Receiver<'_, T> {}
unsafe impl<T> Sync for Receiver<'_, T> {}
impl<'r, T> Receiver<'r, T> {
/// Creates a new [`Receiver`] from `rx`.
pub fn new<R>(rx: R) -> Self
where
R: Read + 'r,
{
Self {
_p: PhantomData,
rx: Reader::new(rx),
pbuf: PacketBuf::new(),
pid: PacketId::new(),
}
}
/// Get a reference to the underlying reader.
pub fn get(&self) -> &dyn Read {
self.rx.get()
}
/// Get a mutable reference to the underlying reader. Directly reading from the stream is not advised.
pub fn get_mut(&mut self) -> &mut dyn Read {
self.rx.get_mut()
}
}
#[cfg(feature = "statistics")]
impl<T> Receiver<'_, T> {
/// Get statistics on this [`Receiver`](Self).
pub fn stats(&self) -> &crate::stats::RecvStats {
self.rx.stats()
}
}
impl<T> Receiver<'_, T> {
/// Attempts to receive an object from the data stream using
/// a custom deserialization function.
///
/// If the underlying data stream is a blocking socket then `recv()` will block until
/// an object is available.
///
/// If the underlying data stream is a non-blocking socket then `recv()` will return
/// an error with a kind of `std::io::ErrorKind::WouldBlock` whenever the complete object is not
/// available.
///
/// The first parameter passed to `de_fn` is the buffer with the
/// serialized data.
///
/// `de_fn` must return the deserialized object.
///
/// # Example
/// ```no_run
/// use channels::Receiver;
///
/// let mut rx = Receiver::<i32>::new(std::io::empty());
///
/// let number = rx.recv_with(|buf| {
/// let number = i32::from_be_bytes(buf[..2].try_into().unwrap());
/// Ok(number)
/// }).unwrap();
/// ```
pub fn recv_with<F>(&mut self, de_fn: F) -> Result<T>
where
F: FnOnce(&[u8]) -> Result<T>,
{
if self.pbuf.len() < PacketBuf::HEADER_SIZE {
io::fill_buffer_to(
&mut self.pbuf,
&mut self.rx,
PacketBuf::HEADER_SIZE,
)?;
if let Err(e) = self.pbuf.verify_header() {
self.pbuf.clear();
return Err(e);
}
if self.pbuf.get_id() != self.pid {
self.pbuf.clear();
return Err(Error::OutOfOrder);
}
self.pid.next_id();
}
let packet_len = usize::from(self.pbuf.get_packet_length());
if self.pbuf.len() < packet_len {
io::fill_buffer_to(
&mut self.pbuf,
&mut self.rx,
packet_len,
)?;
}
self.pbuf.clear();
let data = de_fn(
&self.pbuf.as_slice()[..packet_len]
[PacketBuf::HEADER_SIZE..],
)?;
#[cfg(feature = "statistics")]
self.rx.stats_mut().update_received_time();
Ok(data)
}
}
#[cfg(feature = "serde")]
impl<T: serde::de::DeserializeOwned> Receiver<'_, T> {
/// Attempts to read an object from the sender end.
///
/// If the underlying data stream is a blocking socket then `recv()` will block until
/// an object is available.
///
/// If the underlying data stream is a non-blocking socket then `recv()` will return
/// an error with a kind of `std::io::ErrorKind::WouldBlock` whenever the complete object is not
/// available.
///
/// # Example
/// ```no_run
/// use channels::Receiver;
///
/// let mut rx = Receiver::<i32>::new(std::io::empty());
///
/// let number = rx.recv().unwrap();
/// ```
pub fn recv(&mut self) -> Result<T> {
self.recv_with(crate::serde::deserialize)
}
}