#[cfg(doc)]
use crate::handshake;
use crate::io::Connection;
use crate::message::Message;
use crate::DistributionFlags;
use futures::io::{AsyncRead, AsyncWrite};
pub fn channel<T>(connection: T, flags: DistributionFlags) -> (Sender<T>, Receiver<T>)
where
T: AsyncRead + AsyncWrite + Unpin + Clone,
{
let _ = flags;
(Sender::new(connection.clone()), Receiver::new(connection))
}
const TYPE_TAG: u8 = 112;
#[derive(Debug)]
pub struct Sender<T> {
connection: Connection<T>,
}
impl<T> Sender<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
fn new(connection: T) -> Self {
Self {
connection: Connection::new(connection),
}
}
pub async fn send(&mut self, message: Message) -> Result<(), SendError> {
if matches!(message, Message::Tick) {
self.connection.write_u32(0).await?;
} else {
let mut buf = Vec::new();
message.write_into(&mut buf)?;
self.connection.write_u32(1 + buf.len() as u32).await?;
self.connection.write_u8(TYPE_TAG).await?;
self.connection.write_all(&buf).await?;
self.connection.flush().await?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct Receiver<T> {
connection: Connection<T>,
}
impl<T> Receiver<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
fn new(connection: T) -> Self {
Self {
connection: Connection::new(connection),
}
}
pub async fn recv(&mut self) -> Result<Message, RecvError> {
let size = match self.connection.read_u32().await {
Ok(size) => size as usize,
Err(e) => {
if e.kind() == std::io::ErrorKind::UnexpectedEof {
return Err(RecvError::Closed);
} else {
return Err(e.into());
}
}
};
if size == 0 {
return Ok(Message::Tick);
}
let tag = self.connection.read_u8().await?;
if tag != TYPE_TAG {
return Err(RecvError::UnexpectedTypeTag { tag });
}
let mut buf = vec![0; size - 1];
self.connection.read_exact(&mut buf).await?;
Message::read_from(&mut buf.as_slice())
}
pub async fn recv_owned(mut self) -> Result<(Message, Self), RecvError> {
let msg = self.recv().await?;
Ok((msg, self))
}
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
#[allow(missing_docs)]
pub enum SendError {
#[error(transparent)]
Encode(#[from] eetf::EncodeError),
#[error(transparent)]
Io(#[from] std::io::Error),
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
#[allow(missing_docs)]
pub enum RecvError {
#[error("connection was closed by the peer")]
Closed,
#[error("unsupported distributed operation {op}")]
UnsupportedOp { op: i32 },
#[error("expected type tag {TYPE_TAG} but got {tag}")]
UnexpectedTypeTag { tag: u8 },
#[error(transparent)]
Decode(#[from] eetf::DecodeError),
#[error(transparent)]
Io(#[from] std::io::Error),
}