local_socket/
connection.rs1use crate::SocketError;
2use futures::{stream::Stream, FutureExt};
3use message_sink::{MessageSink, SinkError};
4use std::{
5 path::PathBuf,
6 pin::Pin,
7 task::{Context, Poll},
8};
9use tokio::net::{UnixSocket, UnixStream};
10use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};
11
12pub struct SocketConnection {
13 sink: MessageSink<Compat<UnixStream>>,
14}
15
16impl SocketConnection {
17 pub(crate) fn new(stream: UnixStream) -> Self {
18 let stream = stream.compat_write();
19 let sink = MessageSink::new(stream);
20 Self { sink }
21 }
22 pub async fn connect(path: PathBuf) -> Result<Self, std::io::Error> {
23 let stream = UnixSocket::new_stream()?.connect(path).await.unwrap();
24 let sink = MessageSink::new(stream.compat_write());
25 Ok(Self { sink })
26 }
27 pub fn write(&mut self, data: Vec<u8>) -> Result<(), SocketError> {
28 match self.sink.write(data) {
29 Ok(_) => Ok(()),
30 Err(_) => Err(SocketError::DataCorrupt),
31 }
32 }
33}
34
35impl Stream for SocketConnection {
36 type Item = Result<Vec<u8>, SocketError>;
37 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38 let socket = self.get_mut();
39
40 match socket.sink.poll_unpin(cx) {
41 Poll::Ready(Err(SinkError::Read(e))) => {
42 return Poll::Ready(Some(Err(SocketError::ReadFailure(e))))
43 }
44 Poll::Ready(Err(SinkError::Write(e))) => {
45 return Poll::Ready(Some(Err(SocketError::WriteFailure(e))))
46 }
47 Poll::Ready(Err(SinkError::LimitExceeded)) => {
48 return Poll::Ready(Some(Err(SocketError::BufferOverflow)))
49 }
50 Poll::Ready(Err(SinkError::Parse(_))) => {
51 return Poll::Ready(Some(Err(SocketError::DataCorrupt)))
52 }
53 Poll::Ready(Err(SinkError::Closed)) => return Poll::Ready(None),
54 Poll::Ready(Ok(message)) => return Poll::Ready(Some(Ok(message))),
55 Poll::Pending => {}
56 };
57
58 Poll::Pending
59 }
60}