use serde_json::{to_vec, from_slice};
use std::net::SocketAddr;
use super::*;
pub async fn session<I, O, R, T>(
stream: TcpStream,
handle_incoming: R,
await_outgoing: T,
) -> Result<(), Error>
where
I: Incoming,
O: Outgoing,
R: HandleIncoming<I>,
T: GetOutgoing<O>,
{
let peer_addr = stream.peer_addr().ok();
let tx_tcp = stream.clone();
let rx_tcp = stream;
let tx_task = transmit(tx_tcp, await_outgoing, peer_addr);
let rx_task = receive(rx_tcp, handle_incoming, peer_addr);
let result = or(tx_task, rx_task).await;
#[cfg(feature = "log")]
if let Err(error) = &result {
match peer_addr {
Some(addr) => log::error!("{}: {}", addr, error),
None => log::error!("<noaddr>: {}", error),
}
} else {
match peer_addr {
Some(addr) => log::debug!("{}: Connection Closed", addr),
None => log::debug!("<noaddr>: Connection Closed"),
}
}
result
}
#[allow(unused_variables)]
async fn receive<I, R>(
mut rx_tcp: TcpStream,
mut handler: R,
peer_addr: Option<SocketAddr>,
) -> Result<(), Error>
where
I: Incoming,
R: HandleIncoming<I>,
{
loop {
let mut len = [0u8; 8];
if rx_tcp.read_exact(&mut len).await.is_err() {
break Ok(());
}
let len = u64::from_be_bytes(len) as usize;
let mut buffer = vec![0u8; len];
if rx_tcp.read_exact(&mut buffer).await.is_err() {
break Err(Error::UnexpectedEof);
}
let incoming = from_slice(&buffer).map_err(Error::Codec)?;
#[cfg(feature = "log")]
match peer_addr {
Some(addr) => log::debug!("{}: Received {:?}", addr, incoming),
None => log::debug!("<noaddr>: Received {:?}", incoming),
}
handler.handle_incoming(incoming).await;
}
}
#[allow(unused_variables)]
async fn transmit<O, T>(
mut tx_tcp: TcpStream,
mut source: T,
peer_addr: Option<SocketAddr>,
) -> Result<(), Error>
where
O: Outgoing,
T: GetOutgoing<O>,
{
while let Some(outgoing) = source.get_outgoing().await {
#[cfg(feature = "log")]
match peer_addr {
Some(addr) => log::debug!("{}: Sending {:?}", addr, outgoing),
None => log::debug!("<noaddr>: Sending {:?}", outgoing),
}
let bytes = to_vec(&outgoing).map_err(Error::Codec)?;
let len = (bytes.len() as u64).to_be_bytes();
let mut tx_fail = false;
tx_fail |= tx_tcp.write_all(&len).await.is_err();
tx_fail |= tx_tcp.write_all(&bytes).await.is_err();
if tx_fail {
return Err(Error::UnexpectedEof);
}
}
Ok(())
}