1use std::collections::VecDeque;
2use std::future::Future;
3use std::io::{BufRead, Error, ErrorKind, IoSlice, Write};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
8use loole::{Receiver, RecvFuture, SendFuture, Sender, unbounded};
9
10use crate::state::SharedState;
11
12pub fn async_pipe() -> (AsyncWriter, AsyncReader) {
34 let (sender, receiver) = unbounded();
35 let state = SharedState::default();
36
37 (
38 AsyncWriter {
39 sender,
40 state: state.clone(),
41 send_future: None,
42 },
43 AsyncReader {
44 receiver,
45 state,
46 buf: VecDeque::new(),
47 reading: None,
48 },
49 )
50}
51
52#[cfg(feature = "async")]
74#[cfg(feature = "sync")]
75pub fn async_reader_pipe() -> (crate::Writer, AsyncReader) {
76 let (sender, receiver) = unbounded();
77 let state = SharedState::default();
78
79 (
80 crate::Writer {
81 sender,
82 state: state.clone(),
83 },
84 AsyncReader {
85 receiver,
86 state,
87 buf: VecDeque::new(),
88 reading: None,
89 },
90 )
91}
92
93#[cfg(feature = "async")]
115#[cfg(feature = "sync")]
116pub fn async_writer_pipe() -> (AsyncWriter, crate::Reader) {
117 let (sender, receiver) = unbounded();
118 let state = SharedState::default();
119
120 (
121 AsyncWriter {
122 sender,
123 state: state.clone(),
124 send_future: None,
125 },
126 crate::Reader {
127 receiver,
128 state,
129 buf: VecDeque::new(),
130 },
131 )
132}
133
134#[derive(Debug)]
138pub struct AsyncWriter {
139 sender: Sender<()>,
140 send_future: Option<SendFuture<()>>,
141 state: SharedState,
142}
143
144impl Clone for AsyncWriter {
145 fn clone(&self) -> Self {
146 Self {
147 sender: self.sender.clone(),
148 send_future: None,
149 state: self.state.clone(),
150 }
151 }
152}
153
154impl AsyncWriter {
155 fn poll_send(&mut self, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
156 if self.send_future.is_none() {
157 self.send_future = Some(self.sender.send_async(()));
158 }
159 match Pin::new(&mut self.send_future.as_mut().unwrap()).poll(cx) {
160 Poll::Ready(Ok(_)) => {
161 self.send_future = None;
162 Poll::Ready(Ok(()))
163 }
164 Poll::Ready(Err(e)) => {
165 self.send_future = None;
166 Poll::Ready(Err(Error::new(ErrorKind::WriteZero, e)))
167 }
168 Poll::Pending => Poll::Pending,
169 }
170 }
171}
172
173impl AsyncWrite for AsyncWriter {
174 fn poll_write(
175 mut self: Pin<&mut Self>,
176 cx: &mut Context<'_>,
177 buf: &[u8],
178 ) -> Poll<std::io::Result<usize>> {
179 let n = self.state.write(buf)?;
180 match self.poll_send(cx)? {
181 Poll::Ready(_) => Poll::Ready(Ok(n)),
182 Poll::Pending => Poll::Pending,
183 }
184 }
185
186 fn poll_write_vectored(
187 mut self: Pin<&mut Self>,
188 cx: &mut Context<'_>,
189 bufs: &[IoSlice<'_>],
190 ) -> Poll<std::io::Result<usize>> {
191 let n = self.state.write_vectored(bufs)?;
192 match self.poll_send(cx)? {
193 Poll::Ready(_) => Poll::Ready(Ok(n)),
194 Poll::Pending => Poll::Pending,
195 }
196 }
197
198 fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
199 Poll::Ready(self.state.flush())
200 }
201
202 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
203 self.sender.close();
204 Poll::Ready(Ok(()))
205 }
206}
207
208#[derive(Debug)]
212pub struct AsyncReader {
213 receiver: Receiver<()>,
214 buf: VecDeque<u8>,
215 reading: Option<RecvFuture<()>>,
216 state: SharedState,
217}
218
219impl AsyncBufRead for AsyncReader {
220 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
221 let this = self.get_mut();
222 while this.buf.is_empty() {
223 let n = this.state.copy_to(&mut this.buf)?;
224 if n == 0 {
225 if this.reading.is_none() {
226 this.reading = Some(this.receiver.recv_async())
227 }
228
229 match Pin::new(this.reading.as_mut().unwrap()).poll(cx) {
230 Poll::Ready(Ok(_)) => {
231 this.reading = None;
232 }
233 Poll::Ready(Err(_)) => {
234 this.reading = None;
235 break;
236 }
237 Poll::Pending => return Poll::Pending,
238 }
239 }
240 }
241
242 if this.buf.is_empty() {
243 _ = this.state.copy_to(&mut this.buf)?;
244 }
245
246 Poll::Ready(this.buf.fill_buf())
247 }
248
249 fn consume(mut self: Pin<&mut Self>, amt: usize) {
250 self.buf.consume(amt)
251 }
252}
253
254impl AsyncRead for AsyncReader {
255 fn poll_read(
256 mut self: Pin<&mut Self>,
257 cx: &mut Context<'_>,
258 mut buf: &mut [u8],
259 ) -> Poll<std::io::Result<usize>> {
260 let data = match self.as_mut().poll_fill_buf(cx)? {
261 Poll::Ready(buf) => buf,
262 Poll::Pending => return Poll::Pending,
263 };
264 let n = buf.write(data)?;
265 self.consume(n);
266 Poll::Ready(Ok(n))
267 }
268}
269
270#[cfg(test)]
271mod tests {
272 use std::io::IoSlice;
273 use std::thread::spawn;
274
275 use futures::{
276 AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, StreamExt, TryStreamExt, executor::block_on,
277 };
278
279 #[test]
280 fn base_write_case() {
281 block_on(async {
282 let (mut writer, reader) = crate::async_pipe();
284 for _ in 0..1000 {
285 writer.write_all("hello".as_bytes()).await.unwrap();
286 }
287 drop(reader)
288 })
289 }
290
291 #[test]
292 fn base_read_case() {
293 block_on(async {
294 let (mut writer, mut reader) = crate::async_pipe();
295
296 writer.write_all("hello ".as_bytes()).await.unwrap();
297 writer.write_all("world".as_bytes()).await.unwrap();
298 drop(writer);
299
300 let mut str = String::new();
301 reader.read_to_string(&mut str).await.unwrap();
302
303 assert_eq!("hello world".to_string(), str);
304 });
305 }
306
307 #[test]
308 fn base_vectored_case() {
309 block_on(async {
310 let (mut writer, mut reader) = crate::async_pipe();
311 _ = writer
312 .write_vectored(&[
313 IoSlice::new("hello ".as_bytes()),
314 IoSlice::new("world".as_bytes()),
315 ])
316 .await
317 .unwrap();
318 drop(writer);
319
320 let mut str = String::new();
321 reader.read_to_string(&mut str).await.unwrap();
322
323 assert_eq!("hello world".to_string(), str);
324 });
325 }
326
327 #[test]
328 fn thread_case() {
329 block_on(async {
330 let (writer, mut reader) = crate::async_pipe();
331 let writers = (0..1000).map(|_| writer.clone()).collect::<Vec<_>>();
332 let writers_len = writers.len();
333 drop(writer);
334 let write_fut = futures::stream::iter(writers)
335 .map(|mut writer| async move { writer.write_all("hello".as_bytes()).await })
336 .buffer_unordered(writers_len)
337 .try_collect::<Vec<()>>();
338
339 let mut str = String::new();
340 let read_fut = reader.read_to_string(&mut str);
341 futures::join!(
342 async {
343 write_fut.await.unwrap();
344 },
345 async { read_fut.await.unwrap() }
346 );
347
348 assert_eq!("hello".repeat(writers_len), str);
349 });
350 }
351
352 #[test]
353 fn writer_err_case() {
354 block_on(async {
355 let (mut writer, reader) = crate::async_pipe();
356 drop(reader);
357
358 assert!(writer.write("hello".as_bytes()).await.is_err());
359 });
360 }
361
362 #[test]
363 fn bufread_case() {
364 block_on(async {
365 let (mut writer, mut reader) = crate::async_pipe();
366 writer.write_all("hello\n".as_bytes()).await.unwrap();
367 writer.write_all("world".as_bytes()).await.unwrap();
368 drop(writer);
369
370 let mut str = String::new();
371 assert_ne!(0, reader.read_line(&mut str).await.unwrap());
372 assert_eq!("hello\n".to_string(), str);
373
374 let mut str = String::new();
375 assert_ne!(0, reader.read_line(&mut str).await.unwrap());
376 assert_eq!("world".to_string(), str);
377
378 let mut str = String::new();
379 assert_eq!(0, reader.read_line(&mut str).await.unwrap());
380 });
381 }
382
383 #[test]
384 fn bufread_lines_case() {
385 block_on(async {
386 let (mut writer, reader) = crate::async_pipe();
387 writer.write_all("hello\n".as_bytes()).await.unwrap();
388 writer.write_all("world".as_bytes()).await.unwrap();
389 drop(writer);
390
391 assert_eq!(2, reader.lines().map(|l| assert!(l.is_ok())).count().await)
392 });
393 }
394
395 #[test]
396 fn thread_writer_case() {
397 use std::io::Write;
398
399 let (writer, mut reader) = crate::async_reader_pipe();
400 for _ in 0..1000 {
401 let mut writer = writer.clone();
402 spawn(move || {
403 writer.write_all("hello".as_bytes()).unwrap();
404 });
405 }
406 drop(writer);
407
408 block_on(async {
409 let mut str = String::new();
410 reader.read_to_string(&mut str).await.unwrap();
411
412 assert_eq!("hello".repeat(1000), str);
413 })
414 }
415
416 #[test]
417 fn thread_reader_case() {
418 use std::io::Read;
419
420 let (writer, mut reader) = crate::async_writer_pipe();
421 for _ in 0..1000 {
422 let mut writer = writer.clone();
423 spawn(move || {
424 block_on(async {
425 writer.write_all("hello".as_bytes()).await.unwrap();
426 })
427 });
428 }
429 drop(writer);
430
431 let mut str = String::new();
432 reader.read_to_string(&mut str).unwrap();
433
434 assert_eq!("hello".repeat(1000), str);
435 }
436
437 #[test]
438 fn threads_write_and_read_case() {
439 let (writer, mut reader) = crate::async_pipe();
440
441 for _ in 0..1000 {
442 let mut writer = writer.clone();
443
444 spawn(move || {
445 block_on(async {
446 writer.write_all(&[0; 4]).await.unwrap();
447 })
448 });
449
450 block_on(async {
451 let mut buf = [0; 4];
452 assert_eq!(buf.len(), reader.read(&mut buf).await.unwrap());
453 })
454 }
455 drop(writer);
456 }
457}