1use std::time::Duration;
2use tokio::{net::unix::pipe, time::sleep, io::AsyncReadExt};
3use nix::{
4 sys::stat::Mode,
5 unistd::mkfifo as mkfifo_internal,
6 libc
7};
8
9pub type Result<T = ()> = std::io::Result<T>;
10
11pub struct Sender<'a> {
12 path: &'a str,
13 sender: pipe::Sender
14}
15
16fn mkfifo(path: &str) -> Result {
17 const FIFO_MODE: Mode = match Mode::from_bits(0o666) {
18 Some(mode) => mode,
19 None => {
20 panic!("Couldn't construct FIFO_MODE.")
21 },
22 };
23
24 Ok(mkfifo_internal(path, FIFO_MODE)?)
25}
26
27impl<'a> Sender<'a> {
28 async fn open_sender(path: &str) -> Result<pipe::Sender> {
29 loop {
30 match pipe::OpenOptions::new().open_sender(path) {
31 Ok(sender) => break Ok(sender),
32 Err(error) if error.raw_os_error() == Some(libc::ENXIO) => {
36 sleep(Duration::from_millis(50)).await;
37 },
38 Err(error) if error.raw_os_error() == Some(libc::ENOENT) => mkfifo(path)?,
42 Err(error) => break Err(error)
43 }
44 }
45 }
46
47 pub async fn open(path: &'a str) -> Result<Sender<'a>> {
48 Ok(Sender {
49 path,
50 sender: Self::open_sender(path).await?
51 })
52 }
53 pub async fn send(&mut self, data: &[u8]) -> Result {
54 let message_length = data.len();
55 let header = &message_length.to_le_bytes();
56 let message = &[header, data].concat()[..];
57
58 loop {
59 match self.sender.try_write(message) {
60 Ok(_) => break Ok(()),
61 Err(error) if error.raw_os_error() == Some(libc::EPIPE) => {
63 self.sender = Self::open_sender(self.path).await?;
64 },
65 Err(error) => break Err(error)
66 }
67 }
68 }
69}
70
71pub struct Receiver<'a> {
72 path: &'a str,
73 receiver: pipe::Receiver
74}
75
76impl<'a> Receiver<'a> {
77 async fn open_receiver(path: &str) -> Result<pipe::Receiver> {
78 loop {
79 match pipe::OpenOptions::new().open_receiver(path) {
80 Ok(sender) => break Ok(sender),
81 Err(error) if error.raw_os_error() == Some(libc::ENOENT) => {
85 mkfifo(path)?;
86 },
87 Err(error) => break Err(error)
88 }
89 }
90 }
91
92 pub async fn open(path: &'a str) -> Result<Receiver<'a>> {
93 Ok(Receiver {
94 path,
95 receiver: Self::open_receiver(path).await?
96 })
97 }
98
99 pub async fn receive(&mut self) -> Result<Vec<u8>> {
100 let mut header = usize::MIN.to_le_bytes();
101
102 loop {
103 match self.receiver.read_exact(&mut header).await {
104 Ok(_) => break,
105 Err(error) if error.kind() == std::io::ErrorKind::UnexpectedEof => {
106 self.receiver = Self::open_receiver(self.path).await?;
107 },
108 Err(error) => return Err(error)
109 };
110 };
111
112 let message_length = usize::from_le_bytes(header);
113 let mut message = vec![0; message_length];
114
115 self.receiver.read_exact(&mut message).await?;
116
117 Ok(message)
118 }
119}