use std::{
fmt::Debug,
marker::PhantomData,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
};
use futures::Stream;
use tokio::{
net::{TcpStream, ToSocketAddrs},
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
use crate::{codecs::Codec, error::Error, handle::Handle};
pub struct TcpClient<C: Codec<OUT, IN>, OUT: Debug + Send + 'static, IN: Debug + Send + 'static> {
local_addr: SocketAddr,
out_tx: UnboundedSender<OUT>,
in_rx: UnboundedReceiver<(IN, SocketAddr)>,
handle: Handle<C, OUT, IN, SocketAddr>,
_c: PhantomData<C>,
}
impl<C: Codec<OUT, IN>, OUT: Debug + Send + 'static, IN: Debug + Send + 'static> Unpin
for TcpClient<C, OUT, IN>
{
}
impl<C: Codec<OUT, IN>, OUT: Debug + Send + 'static, IN: Debug + Send + 'static>
TcpClient<C, OUT, IN>
{
pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<Self, Error> {
let stream = TcpStream::connect(addr).await?;
let local_addr = stream.local_addr()?;
let (in_tx, in_rx) = tokio::sync::mpsc::unbounded_channel();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
let addr = stream.peer_addr()?;
let (stream_rx, stream_tx) = stream.into_split();
let handle = Handle::new(addr, stream_rx, stream_tx, out_rx, in_tx).await?;
Ok(Self {
local_addr,
handle,
out_tx,
in_rx,
_c: PhantomData,
})
}
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub async fn send(&mut self, msg: OUT) -> Result<(), Error> {
self.out_tx.send(msg).map_err(|_e| Error::Send)?;
Ok(())
}
pub fn close(self) {
let _ = self.handle.close();
}
}
impl<C: Codec<OUT, IN>, OUT: Debug + Send + Unpin + 'static, IN: Debug + Send + Unpin + 'static>
Stream for TcpClient<C, OUT, IN>
{
type Item = IN;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().in_rx.poll_recv(cx) {
Poll::Ready(Some((msg, _addr))) => Poll::Ready(Some(msg)),
Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending,
}
}
}