local_socket/
connection.rs

1use crate::SocketError;
2use futures::{stream::Stream, FutureExt};
3use message_sink::{MessageSink, SinkError};
4use std::{
5    path::PathBuf,
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tokio::net::{UnixSocket, UnixStream};
10use tokio_util::compat::{Compat, TokioAsyncWriteCompatExt};
11
12pub struct SocketConnection {
13    sink: MessageSink<Compat<UnixStream>>,
14}
15
16impl SocketConnection {
17    pub(crate) fn new(stream: UnixStream) -> Self {
18        let stream = stream.compat_write();
19        let sink = MessageSink::new(stream);
20        Self { sink }
21    }
22    pub async fn connect(path: PathBuf) -> Result<Self, std::io::Error> {
23        let stream = UnixSocket::new_stream()?.connect(path).await.unwrap();
24        let sink = MessageSink::new(stream.compat_write());
25        Ok(Self { sink })
26    }
27    pub fn write(&mut self, data: Vec<u8>) -> Result<(), SocketError> {
28        match self.sink.write(data) {
29            Ok(_) => Ok(()),
30            Err(_) => Err(SocketError::DataCorrupt),
31        }
32    }
33}
34
35impl Stream for SocketConnection {
36    type Item = Result<Vec<u8>, SocketError>;
37    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38        let socket = self.get_mut();
39
40        match socket.sink.poll_unpin(cx) {
41            Poll::Ready(Err(SinkError::Read(e))) => {
42                return Poll::Ready(Some(Err(SocketError::ReadFailure(e))))
43            }
44            Poll::Ready(Err(SinkError::Write(e))) => {
45                return Poll::Ready(Some(Err(SocketError::WriteFailure(e))))
46            }
47            Poll::Ready(Err(SinkError::LimitExceeded)) => {
48                return Poll::Ready(Some(Err(SocketError::BufferOverflow)))
49            }
50            Poll::Ready(Err(SinkError::Parse(_))) => {
51                return Poll::Ready(Some(Err(SocketError::DataCorrupt)))
52            }
53            Poll::Ready(Err(SinkError::Closed)) => return Poll::Ready(None),
54            Poll::Ready(Ok(message)) => return Poll::Ready(Some(Ok(message))),
55            Poll::Pending => {}
56        };
57
58        Poll::Pending
59    }
60}