grr_plugin/
unix.rs

1use anyhow::anyhow;
2use async_stream::stream;
3use futures::Stream;
4use futures::TryFutureExt;
5use tempfile::{tempdir, TempDir};
6use tokio::net::UnixListener;
7
8use std::{
9    pin::Pin,
10    sync::Arc,
11    task::{Context, Poll},
12};
13
14use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
15use tonic::transport::server::Connected;
16
17use super::error::Error;
18
19const SOCKET_FILENAME: &str = "landslide_jsonrpc.sock";
20
21//own this so it doesn't go out of scope and get deleted
22pub struct TempSocket(TempDir);
23impl TempSocket {
24    pub fn new() -> Result<TempSocket, Error> {
25        Ok(Self(tempdir()?))
26    }
27
28    pub fn socket_filename(&self) -> Result<String, Error> {
29        let socket_path_buf = self.0.path().join(SOCKET_FILENAME);
30        let socket_path = String::from(socket_path_buf.to_str().ok_or_else(|| {
31            anyhow!(
32                "Unable to convert PathBuf {:?} to a String.",
33                socket_path_buf
34            )
35        })?);
36        Ok(socket_path)
37    }
38}
39
40pub async fn incoming_from_path(
41    path: &str,
42) -> Result<impl Stream<Item = Result<UnixStream, std::io::Error>>, Error> {
43    let uds = UnixListener::bind(path)?;
44
45    Ok(stream! {
46        loop {
47            let item = uds.accept().map_ok(|(st, _)| UnixStream(st)).await;
48
49            yield item;
50        }
51    })
52}
53
54#[derive(Debug)]
55pub struct UnixStream(pub tokio::net::UnixStream);
56
57impl Connected for UnixStream {
58    type ConnectInfo = UdsConnectInfo;
59
60    fn connect_info(&self) -> Self::ConnectInfo {
61        UdsConnectInfo {
62            peer_addr: self.0.peer_addr().ok().map(Arc::new),
63            peer_cred: self.0.peer_cred().ok(),
64        }
65    }
66}
67
68#[derive(Clone, Debug)]
69pub struct UdsConnectInfo {
70    pub peer_addr: Option<Arc<tokio::net::unix::SocketAddr>>,
71    pub peer_cred: Option<tokio::net::unix::UCred>,
72}
73
74impl AsyncRead for UnixStream {
75    fn poll_read(
76        mut self: Pin<&mut Self>,
77        cx: &mut Context<'_>,
78        buf: &mut ReadBuf<'_>,
79    ) -> Poll<std::io::Result<()>> {
80        Pin::new(&mut self.0).poll_read(cx, buf)
81    }
82}
83
84impl AsyncWrite for UnixStream {
85    fn poll_write(
86        mut self: Pin<&mut Self>,
87        cx: &mut Context<'_>,
88        buf: &[u8],
89    ) -> Poll<std::io::Result<usize>> {
90        Pin::new(&mut self.0).poll_write(cx, buf)
91    }
92
93    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
94        Pin::new(&mut self.0).poll_flush(cx)
95    }
96
97    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
98        Pin::new(&mut self.0).poll_shutdown(cx)
99    }
100}