unix_fifo_async/
named_pipe.rs

1use async_std::{fs, io};
2use std::path::PathBuf;
3
4/// Represents a path to a Unix named pipe (FIFO).
5///
6/// Provides convenience methods to create readers and writers, as well as an
7/// easy way to ensure the pipe actually exists.
8#[derive(Clone)]
9pub struct NamedPipePath {
10    inner: PathBuf,
11}
12
13impl NamedPipePath {
14    /// Wraps a given path in a `NamedPipePath`.
15    pub fn new<T: Into<PathBuf>>(path: T) -> Self {
16        Self { inner: path.into() }
17    }
18    /// Checks if the path exists.
19    pub fn exists(&self) -> bool {
20        self.inner.exists()
21    }
22    /// Ensures the path exists, creating a named pipe in its place if it doesn't.
23    pub fn ensure_exists(&self) -> nix::Result<()> {
24        if !self.exists() {
25            crate::create_pipe(&self.inner, None)
26        } else {
27            Ok(())
28        }
29    }
30    /// Tries to delete the pipe from disk and consumes the `NamedPipe`.
31    pub async fn delete(self) -> io::Result<()> {
32        if self.inner.exists() {
33            crate::remove_pipe(&self.inner).await
34        } else {
35            Ok(())
36        }
37    }
38
39    /// Creates a reader for this named pipe.
40    pub fn open_read(&self) -> NamedPipeReader {
41        NamedPipeReader::from_path(self)
42    }
43    /// Creates a writer for this named pipe.
44    pub fn open_write(&self) -> NamedPipeWriter {
45        NamedPipeWriter::from_path(self)
46    }
47}
48
49/// A convenience wrapper for reading from Unix named pipes.
50pub struct NamedPipeReader {
51    path: NamedPipePath,
52}
53
54impl NamedPipeReader {
55    /// Creates a new reader, cloning the given NamedPipePath.
56    pub fn from_path(source: &NamedPipePath) -> Self {
57        Self {
58            path: source.clone(),
59        }
60    }
61    /// Checks if the named pipe actually exists and tries to create it if it doesn't.
62    pub fn ensure_pipe_exists(&self) -> nix::Result<&Self> {
63        self.path.ensure_exists()?;
64        Ok(self)
65    }
66    /// Reads all bytes from the pipe.
67    /// The returned Future will resolve when something is written to the pipe.
68    pub async fn read(&self) -> io::Result<Vec<u8>> {
69        fs::read(&self.path.inner).await
70    }
71    /// Reads a String from the pipe.
72    /// The returned Future will resolve when something is written to the pipe.
73    pub async fn read_string(&self) -> io::Result<String> {
74        fs::read_to_string(&self.path.inner).await
75    }
76}
77
78/// A convenience wrapper for writing to Unix named pipes.
79pub struct NamedPipeWriter {
80    path: NamedPipePath,
81}
82
83impl NamedPipeWriter {
84    async fn _write(&self, data: &[u8]) -> io::Result<()> {
85        use async_std::io::Write;
86        let mut file = async_std::fs::OpenOptions::new()
87            .write(true)
88            .create(false)
89            .open(&self.path.inner)
90            .await?;
91        file.write_all(data).await
92    }
93    pub fn from_path(source: &NamedPipePath) -> Self {
94        Self {
95            path: source.clone(),
96        }
97    }
98    /// Checks if the named pipe actually exists and tries to create it if it doesn't.
99    pub fn ensure_pipe_exists(&self) -> nix::Result<&Self> {
100        self.path.ensure_exists()?;
101        Ok(self)
102    }
103    /// Writes byte data to the pipe.
104    /// The returned Future will resolve when the bytes are read from the pipe.
105    pub async fn write(&self, data: &[u8]) -> io::Result<()> {
106        self._write(data).await
107    }
108    /// Writes &str data to the pipe.
109    /// The returned Future will resolve when the string is read from the pipe.
110    pub async fn write_str(&self, data: &str) -> io::Result<()> {
111        self._write(data.as_bytes()).await
112    }
113}
114
115#[cfg(test)]
116mod tests {
117    use async_std::{task::{self, block_on}, io};
118    #[test]
119    fn write_and_read_threaded() -> io::Result<()> {
120        use std::thread;
121        let pipe = super::NamedPipePath::new("./test_pipe_3");
122        pipe.ensure_exists().unwrap();
123        let writer = pipe.open_write();
124        let reader = pipe.open_read();
125        let data_to_send = "Hello pipe";
126        let t_write = thread::spawn(move || block_on(writer.write_str(data_to_send)));
127        let t_read = thread::spawn(move || block_on(reader.read_string()));
128        t_write.join().unwrap()?;
129        let read_result = t_read.join().unwrap()?;
130        assert_eq!(read_result, data_to_send);
131        block_on(pipe.delete())
132    }
133    #[test]
134    fn write_and_read_async() -> io::Result<()> {
135        block_on(async {
136            let pipe = super::NamedPipePath::new("./test_pipe_4");
137            pipe.ensure_exists().unwrap();
138            let writer = pipe.open_write();
139            let reader = pipe.open_read();
140            let data_to_send = "Hello pipe";
141            let t1 = task::spawn(async move { writer.write_str(data_to_send).await });
142            let t2 = task::spawn(async move { reader.read_string().await });
143            t1.await?;
144            let read_result = t2.await?;
145            assert_eq!(read_result, data_to_send);
146            pipe.delete().await
147        })
148    }
149    #[test]
150    fn ensure_on_write() -> io::Result<()> {
151        block_on(async {
152            let pipe = super::NamedPipePath::new("./test_pipe_5");
153            pipe.ensure_exists().unwrap();
154            let writer = pipe.open_write();
155            let reader = pipe.open_read();
156            let data_to_send = "Hello pipe";
157            let t1 = task::spawn(async move {
158                writer
159                    .ensure_pipe_exists()
160                    .unwrap()
161                    .write_str(data_to_send)
162                    .await
163            });
164            let t2 = task::spawn(async move { reader.read_string().await });
165            t1.await?;
166            let read_result = t2.await?;
167            assert_eq!(read_result, data_to_send);
168            pipe.delete().await
169        })
170    }
171    #[test]
172    fn ensure_on_read() -> io::Result<()> {
173        block_on(async {
174            let pipe = super::NamedPipePath::new("./test_pipe_6");
175            pipe.ensure_exists().unwrap();
176            let writer = pipe.open_write();
177            let reader = pipe.open_read();
178            let data_to_send = "Hello pipe";
179            let t1 = task::spawn(async move { writer.write_str(data_to_send).await });
180            let t2 =
181                task::spawn(
182                    async move { reader.ensure_pipe_exists().unwrap().read_string().await },
183                );
184            t1.await?;
185            let read_result = t2.await?;
186            assert_eq!(read_result, data_to_send);
187            pipe.delete().await
188        })
189    }
190}