use anyhow::Result;
use futures_core::Stream;
use protocol::codec;
use protocol::{ClientMessage, ServerMessage};
use std::path::Path;
use tokio::net::unix::{OwnedReadHalf, OwnedWriteHalf};
pub struct Connection {
reader: OwnedReadHalf,
writer: OwnedWriteHalf,
}
impl Connection {
pub async fn connect(socket_path: &Path) -> Result<Self> {
let stream = tokio::net::UnixStream::connect(socket_path).await?;
tracing::debug!("connected to {}", socket_path.display());
let (reader, writer) = stream.into_split();
Ok(Self { reader, writer })
}
pub async fn send(&mut self, msg: ClientMessage) -> Result<ServerMessage> {
codec::write_message(&mut self.writer, &msg)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
self.read_message().await
}
pub fn stream(&mut self, msg: ClientMessage) -> impl Stream<Item = Result<ServerMessage>> + '_ {
async_stream::try_stream! {
codec::write_message(&mut self.writer, &msg)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
loop {
let server_msg = self.read_message().await?;
match &server_msg {
ServerMessage::StreamEnd { .. } => break,
ServerMessage::Error { .. } => {
yield server_msg;
break;
}
_ => yield server_msg,
}
}
}
}
pub fn download_stream(
&mut self,
msg: ClientMessage,
) -> impl Stream<Item = Result<ServerMessage>> + '_ {
async_stream::try_stream! {
codec::write_message(&mut self.writer, &msg)
.await
.map_err(|e| anyhow::anyhow!("{e}"))?;
loop {
let server_msg = self.read_message().await?;
match &server_msg {
ServerMessage::DownloadEnd { .. } => {
yield server_msg;
break;
}
ServerMessage::Error { .. } => {
yield server_msg;
break;
}
_ => yield server_msg,
}
}
}
}
pub fn close(self) {
drop(self);
}
async fn read_message(&mut self) -> Result<ServerMessage> {
codec::read_message(&mut self.reader)
.await
.map_err(|e| anyhow::anyhow!("{e}"))
}
}