unix_fifo_async/
named_pipe.rs1use async_std::{fs, io};
2use std::path::PathBuf;
3
4#[derive(Clone)]
9pub struct NamedPipePath {
10 inner: PathBuf,
11}
12
13impl NamedPipePath {
14 pub fn new<T: Into<PathBuf>>(path: T) -> Self {
16 Self { inner: path.into() }
17 }
18 pub fn exists(&self) -> bool {
20 self.inner.exists()
21 }
22 pub fn ensure_exists(&self) -> nix::Result<()> {
24 if !self.exists() {
25 crate::create_pipe(&self.inner, None)
26 } else {
27 Ok(())
28 }
29 }
30 pub async fn delete(self) -> io::Result<()> {
32 if self.inner.exists() {
33 crate::remove_pipe(&self.inner).await
34 } else {
35 Ok(())
36 }
37 }
38
39 pub fn open_read(&self) -> NamedPipeReader {
41 NamedPipeReader::from_path(self)
42 }
43 pub fn open_write(&self) -> NamedPipeWriter {
45 NamedPipeWriter::from_path(self)
46 }
47}
48
49pub struct NamedPipeReader {
51 path: NamedPipePath,
52}
53
54impl NamedPipeReader {
55 pub fn from_path(source: &NamedPipePath) -> Self {
57 Self {
58 path: source.clone(),
59 }
60 }
61 pub fn ensure_pipe_exists(&self) -> nix::Result<&Self> {
63 self.path.ensure_exists()?;
64 Ok(self)
65 }
66 pub async fn read(&self) -> io::Result<Vec<u8>> {
69 fs::read(&self.path.inner).await
70 }
71 pub async fn read_string(&self) -> io::Result<String> {
74 fs::read_to_string(&self.path.inner).await
75 }
76}
77
78pub struct NamedPipeWriter {
80 path: NamedPipePath,
81}
82
83impl NamedPipeWriter {
84 async fn _write(&self, data: &[u8]) -> io::Result<()> {
85 use async_std::io::Write;
86 let mut file = async_std::fs::OpenOptions::new()
87 .write(true)
88 .create(false)
89 .open(&self.path.inner)
90 .await?;
91 file.write_all(data).await
92 }
93 pub fn from_path(source: &NamedPipePath) -> Self {
94 Self {
95 path: source.clone(),
96 }
97 }
98 pub fn ensure_pipe_exists(&self) -> nix::Result<&Self> {
100 self.path.ensure_exists()?;
101 Ok(self)
102 }
103 pub async fn write(&self, data: &[u8]) -> io::Result<()> {
106 self._write(data).await
107 }
108 pub async fn write_str(&self, data: &str) -> io::Result<()> {
111 self._write(data.as_bytes()).await
112 }
113}
114
115#[cfg(test)]
116mod tests {
117 use async_std::{task::{self, block_on}, io};
118 #[test]
119 fn write_and_read_threaded() -> io::Result<()> {
120 use std::thread;
121 let pipe = super::NamedPipePath::new("./test_pipe_3");
122 pipe.ensure_exists().unwrap();
123 let writer = pipe.open_write();
124 let reader = pipe.open_read();
125 let data_to_send = "Hello pipe";
126 let t_write = thread::spawn(move || block_on(writer.write_str(data_to_send)));
127 let t_read = thread::spawn(move || block_on(reader.read_string()));
128 t_write.join().unwrap()?;
129 let read_result = t_read.join().unwrap()?;
130 assert_eq!(read_result, data_to_send);
131 block_on(pipe.delete())
132 }
133 #[test]
134 fn write_and_read_async() -> io::Result<()> {
135 block_on(async {
136 let pipe = super::NamedPipePath::new("./test_pipe_4");
137 pipe.ensure_exists().unwrap();
138 let writer = pipe.open_write();
139 let reader = pipe.open_read();
140 let data_to_send = "Hello pipe";
141 let t1 = task::spawn(async move { writer.write_str(data_to_send).await });
142 let t2 = task::spawn(async move { reader.read_string().await });
143 t1.await?;
144 let read_result = t2.await?;
145 assert_eq!(read_result, data_to_send);
146 pipe.delete().await
147 })
148 }
149 #[test]
150 fn ensure_on_write() -> io::Result<()> {
151 block_on(async {
152 let pipe = super::NamedPipePath::new("./test_pipe_5");
153 pipe.ensure_exists().unwrap();
154 let writer = pipe.open_write();
155 let reader = pipe.open_read();
156 let data_to_send = "Hello pipe";
157 let t1 = task::spawn(async move {
158 writer
159 .ensure_pipe_exists()
160 .unwrap()
161 .write_str(data_to_send)
162 .await
163 });
164 let t2 = task::spawn(async move { reader.read_string().await });
165 t1.await?;
166 let read_result = t2.await?;
167 assert_eq!(read_result, data_to_send);
168 pipe.delete().await
169 })
170 }
171 #[test]
172 fn ensure_on_read() -> io::Result<()> {
173 block_on(async {
174 let pipe = super::NamedPipePath::new("./test_pipe_6");
175 pipe.ensure_exists().unwrap();
176 let writer = pipe.open_write();
177 let reader = pipe.open_read();
178 let data_to_send = "Hello pipe";
179 let t1 = task::spawn(async move { writer.write_str(data_to_send).await });
180 let t2 =
181 task::spawn(
182 async move { reader.ensure_pipe_exists().unwrap().read_string().await },
183 );
184 t1.await?;
185 let read_result = t2.await?;
186 assert_eq!(read_result, data_to_send);
187 pipe.delete().await
188 })
189 }
190}