use std::{
fmt::Debug,
marker::PhantomData,
path::{Path, PathBuf},
pin::Pin,
task::{Context, Poll},
};
use futures::Stream;
use tokio::{
net::UnixStream,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
use super::UnixSocketId;
use crate::{codecs::Codec, error::Error, handle::Handle};
pub struct UnixClient<C: Codec<OUT, IN>, OUT: Debug + Send + 'static, IN: Debug + Send + 'static> {
local_path: PathBuf,
out_tx: UnboundedSender<OUT>,
in_rx: UnboundedReceiver<(IN, UnixSocketId)>,
handle: Handle<C, OUT, IN, UnixSocketId>,
_c: PhantomData<C>,
}
impl<C: Codec<OUT, IN>, OUT: Debug + Send + 'static, IN: Debug + Send + 'static> Unpin
for UnixClient<C, OUT, IN>
{
}
impl<C: Codec<OUT, IN>, OUT: Debug + Send + 'static, IN: Debug + Send + 'static>
UnixClient<C, OUT, IN>
{
pub async fn connect(path: &Path) -> Result<Self, Error> {
let stream = UnixStream::connect(path).await?;
let (in_tx, in_rx) = tokio::sync::mpsc::unbounded_channel();
let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
let (stream_rx, stream_tx) = stream.into_split();
let handle = Handle::new(UnixSocketId(0), stream_rx, stream_tx, out_rx, in_tx).await?;
Ok(Self {
local_path: path.to_path_buf(),
handle,
out_tx,
in_rx,
_c: PhantomData,
})
}
pub fn local_path(&self) -> &Path {
&self.local_path
}
pub async fn send(&mut self, msg: OUT) -> Result<(), Error> {
self.out_tx.send(msg).map_err(|_e| Error::Send)?;
Ok(())
}
pub fn close(self) {
let _ = self.handle.close();
}
}
impl<C: Codec<OUT, IN>, OUT: Debug + Send + Unpin + 'static, IN: Debug + Send + Unpin + 'static>
Stream for UnixClient<C, OUT, IN>
{
type Item = IN;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().in_rx.poll_recv(cx) {
Poll::Ready(Some((msg, _addr))) => Poll::Ready(Some(msg)),
Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending,
}
}
}