earst/
lib.rs

1use std::time::Duration;
2use tokio::{net::unix::pipe, time::sleep, io::AsyncReadExt};
3use nix::{
4    sys::stat::Mode,
5    unistd::mkfifo as mkfifo_internal,
6    libc
7};
8
9pub type Result<T = ()> = std::io::Result<T>;
10
11pub struct Sender<'a> {
12    path: &'a str,
13    sender: pipe::Sender
14}
15
16fn mkfifo(path: &str) -> Result {
17    const FIFO_MODE: Mode = match Mode::from_bits(0o666) {
18        Some(mode) => mode,
19        None => {
20            panic!("Couldn't construct FIFO_MODE.")
21        },
22    };
23
24    Ok(mkfifo_internal(path, FIFO_MODE)?)
25}
26
27impl<'a> Sender<'a> {
28    async fn open_sender(path: &str) -> Result<pipe::Sender> {
29        loop {
30            match pipe::OpenOptions::new().open_sender(path) {
31                Ok(sender) => break Ok(sender),
32                /* ENXIO = No such device or address
33                 * returned whenever there isn't a
34                 * receiving end for the pipe */
35                Err(error) if error.raw_os_error() == Some(libc::ENXIO) => {
36                    sleep(Duration::from_millis(50)).await;
37                },
38                /* ENOENT = No such file or directory
39                 * returned whenever the named pipe
40                 * does not exist (yet) */
41                Err(error) if error.raw_os_error() == Some(libc::ENOENT) => mkfifo(path)?,
42                Err(error) => break Err(error)
43            }
44        }
45    }
46
47    pub async fn open(path: &'a str) -> Result<Sender<'a>> {
48        Ok(Sender {
49            path,
50            sender: Self::open_sender(path).await?
51        })
52    }
53    pub async fn send(&mut self, data: &[u8]) -> Result {
54        let message_length = data.len();
55        let header = &message_length.to_le_bytes();
56        let message = &[header, data].concat()[..];
57
58        loop {
59            match self.sender.try_write(message) {
60                Ok(_) => break Ok(()),
61                // EPIPE = broken pipe
62                Err(error) if error.raw_os_error() == Some(libc::EPIPE) => {
63                    self.sender = Self::open_sender(self.path).await?;
64                },
65                Err(error) => break Err(error)
66            }
67        }
68    }
69}
70
71pub struct Receiver<'a> {
72    path: &'a str,
73    receiver: pipe::Receiver
74}
75
76impl<'a> Receiver<'a> {
77    async fn open_receiver(path: &str) -> Result<pipe::Receiver> {
78        loop {
79            match pipe::OpenOptions::new().open_receiver(path) {
80                Ok(sender) => break Ok(sender),
81                /* ENOENT = No such file or directory
82                 * returned whenever the named pipe
83                 * does not exist (yet) */
84                Err(error) if error.raw_os_error() == Some(libc::ENOENT) => {
85                    mkfifo(path)?;
86                },
87                Err(error) => break Err(error)
88            }
89        }
90    }
91
92    pub async fn open(path: &'a str) -> Result<Receiver<'a>> {
93        Ok(Receiver {
94            path,
95            receiver: Self::open_receiver(path).await?
96        })
97    }
98
99    pub async fn receive(&mut self) -> Result<Vec<u8>> {
100        let mut header = usize::MIN.to_le_bytes();
101
102        loop {
103            match self.receiver.read_exact(&mut header).await {
104                Ok(_) => break,
105                Err(error) if error.kind() == std::io::ErrorKind::UnexpectedEof => {
106                    self.receiver = Self::open_receiver(self.path).await?;
107                },
108                Err(error) => return Err(error)
109            };
110        };
111
112        let message_length = usize::from_le_bytes(header);
113        let mut message = vec![0; message_length];
114
115        self.receiver.read_exact(&mut message).await?;
116
117        Ok(message)
118    }
119}