1use std::collections::VecDeque;
2use std::future::Future;
3use std::io::{BufRead, Error, ErrorKind, IoSlice, Write};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
8use loole::{Receiver, RecvFuture, SendFuture, Sender, unbounded};
9
10use crate::state::SharedState;
11
12pub fn async_pipe() -> (AsyncWriter, AsyncReader) {
34 let (sender, receiver) = unbounded();
35 let state = SharedState::default();
36
37 (
38 AsyncWriter {
39 sender,
40 state: state.clone(),
41 write_state: None,
42 },
43 AsyncReader {
44 receiver,
45 state,
46 buf: VecDeque::new(),
47 reading: None,
48 },
49 )
50}
51
52#[cfg(feature = "async")]
74#[cfg(feature = "sync")]
75pub fn async_reader_pipe() -> (crate::Writer, AsyncReader) {
76 let (sender, receiver) = unbounded();
77 let state = SharedState::default();
78
79 (
80 crate::Writer {
81 sender,
82 state: state.clone(),
83 },
84 AsyncReader {
85 receiver,
86 state,
87 buf: VecDeque::new(),
88 reading: None,
89 },
90 )
91}
92
93#[cfg(feature = "async")]
115#[cfg(feature = "sync")]
116pub fn async_writer_pipe() -> (AsyncWriter, crate::Reader) {
117 let (sender, receiver) = unbounded();
118 let state = SharedState::default();
119
120 (
121 AsyncWriter {
122 sender,
123 state: state.clone(),
124 write_state: None,
125 },
126 crate::Reader {
127 receiver,
128 state,
129 buf: VecDeque::new(),
130 },
131 )
132}
133
134#[derive(Debug)]
135struct WriteState {
136 send_future: SendFuture<()>,
137 n: usize,
138}
139
140#[derive(Debug)]
144pub struct AsyncWriter {
145 sender: Sender<()>,
146 write_state: Option<WriteState>,
147 state: SharedState,
148}
149
150impl Clone for AsyncWriter {
151 fn clone(&self) -> Self {
152 Self {
153 sender: self.sender.clone(),
154 write_state: None,
155 state: self.state.clone(),
156 }
157 }
158}
159
160impl AsyncWriter {
161 fn poll_send(&mut self, cx: &mut Context<'_>) -> Poll<std::io::Result<usize>> {
162 debug_assert!(self.write_state.is_some());
163 match Pin::new(&mut self.write_state.as_mut().unwrap().send_future).poll(cx) {
164 Poll::Ready(Ok(_)) => {
165 let n = self.write_state.as_ref().map(|s| s.n).unwrap();
166 self.write_state = None;
167 Poll::Ready(Ok(n))
168 }
169 Poll::Ready(Err(e)) => {
170 self.write_state = None;
171 Poll::Ready(Err(Error::new(ErrorKind::WriteZero, e)))
172 }
173 Poll::Pending => Poll::Pending,
174 }
175 }
176}
177
178impl AsyncWrite for AsyncWriter {
179 fn poll_write(
180 mut self: Pin<&mut Self>,
181 cx: &mut Context<'_>,
182 buf: &[u8],
183 ) -> Poll<std::io::Result<usize>> {
184 if self.write_state.is_none() {
185 self.write_state = Some(WriteState {
186 send_future: self.sender.send_async(()),
187 n: self.state.write(buf)?,
188 });
189 }
190 match self.poll_send(cx)? {
191 Poll::Ready(n) => Poll::Ready(Ok(n)),
192 Poll::Pending => Poll::Pending,
193 }
194 }
195
196 fn poll_write_vectored(
197 mut self: Pin<&mut Self>,
198 cx: &mut Context<'_>,
199 bufs: &[IoSlice<'_>],
200 ) -> Poll<std::io::Result<usize>> {
201 if self.write_state.is_none() {
202 self.write_state = Some(WriteState {
203 send_future: self.sender.send_async(()),
204 n: self.state.write_vectored(bufs)?,
205 });
206 }
207 match self.poll_send(cx)? {
208 Poll::Ready(n) => Poll::Ready(Ok(n)),
209 Poll::Pending => Poll::Pending,
210 }
211 }
212
213 fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
214 Poll::Ready(self.state.flush())
215 }
216
217 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
218 self.sender.close();
219 Poll::Ready(Ok(()))
220 }
221}
222
223#[derive(Debug)]
227pub struct AsyncReader {
228 receiver: Receiver<()>,
229 buf: VecDeque<u8>,
230 reading: Option<RecvFuture<()>>,
231 state: SharedState,
232}
233
234impl AsyncBufRead for AsyncReader {
235 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<&[u8]>> {
236 let this = self.get_mut();
237 while this.buf.is_empty() {
238 let n = this.state.copy_to(&mut this.buf)?;
239 if n == 0 {
240 if this.reading.is_none() {
241 this.reading = Some(this.receiver.recv_async())
242 }
243
244 match Pin::new(this.reading.as_mut().unwrap()).poll(cx) {
245 Poll::Ready(Ok(_)) => {
246 this.reading = None;
247 }
248 Poll::Ready(Err(_)) => {
249 this.reading = None;
250 break;
251 }
252 Poll::Pending => return Poll::Pending,
253 }
254 }
255 }
256
257 if this.buf.is_empty() {
258 _ = this.state.copy_to(&mut this.buf)?;
259 }
260
261 Poll::Ready(this.buf.fill_buf())
262 }
263
264 fn consume(mut self: Pin<&mut Self>, amt: usize) {
265 self.buf.consume(amt)
266 }
267}
268
269impl AsyncRead for AsyncReader {
270 fn poll_read(
271 mut self: Pin<&mut Self>,
272 cx: &mut Context<'_>,
273 mut buf: &mut [u8],
274 ) -> Poll<std::io::Result<usize>> {
275 let data = match self.as_mut().poll_fill_buf(cx)? {
276 Poll::Ready(buf) => buf,
277 Poll::Pending => return Poll::Pending,
278 };
279 let n = buf.write(data)?;
280 self.consume(n);
281 Poll::Ready(Ok(n))
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use std::io::IoSlice;
288 use std::thread::spawn;
289
290 use futures::{
291 AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, StreamExt, TryStreamExt, executor::block_on,
292 };
293
294 #[test]
295 fn base_write_case() {
296 block_on(async {
297 let (mut writer, reader) = crate::async_pipe();
299 for _ in 0..1000 {
300 writer.write_all("hello".as_bytes()).await.unwrap();
301 }
302 drop(reader)
303 })
304 }
305
306 #[test]
307 fn base_read_case() {
308 block_on(async {
309 let (mut writer, mut reader) = crate::async_pipe();
310
311 writer.write_all("hello ".as_bytes()).await.unwrap();
312 writer.write_all("world".as_bytes()).await.unwrap();
313 drop(writer);
314
315 let mut str = String::new();
316 reader.read_to_string(&mut str).await.unwrap();
317
318 assert_eq!("hello world".to_string(), str);
319 });
320 }
321
322 #[test]
323 fn base_vectored_case() {
324 block_on(async {
325 let (mut writer, mut reader) = crate::async_pipe();
326 _ = writer
327 .write_vectored(&[
328 IoSlice::new("hello ".as_bytes()),
329 IoSlice::new("world".as_bytes()),
330 ])
331 .await
332 .unwrap();
333 drop(writer);
334
335 let mut str = String::new();
336 reader.read_to_string(&mut str).await.unwrap();
337
338 assert_eq!("hello world".to_string(), str);
339 });
340 }
341
342 #[test]
343 fn thread_case() {
344 block_on(async {
345 let (writer, mut reader) = crate::async_pipe();
346 let writers = (0..1000).map(|_| writer.clone()).collect::<Vec<_>>();
347 let writers_len = writers.len();
348 drop(writer);
349 let write_fut = futures::stream::iter(writers)
350 .map(|mut writer| async move { writer.write_all("hello".as_bytes()).await })
351 .buffer_unordered(writers_len)
352 .try_collect::<Vec<()>>();
353
354 let mut str = String::new();
355 let read_fut = reader.read_to_string(&mut str);
356 futures::join!(
357 async {
358 write_fut.await.unwrap();
359 },
360 async { read_fut.await.unwrap() }
361 );
362
363 assert_eq!("hello".repeat(writers_len), str);
364 });
365 }
366
367 #[test]
368 fn writer_err_case() {
369 block_on(async {
370 let (mut writer, reader) = crate::async_pipe();
371 drop(reader);
372
373 assert!(writer.write("hello".as_bytes()).await.is_err());
374 });
375 }
376
377 #[test]
378 fn bufread_case() {
379 block_on(async {
380 let (mut writer, mut reader) = crate::async_pipe();
381 writer.write_all("hello\n".as_bytes()).await.unwrap();
382 writer.write_all("world".as_bytes()).await.unwrap();
383 drop(writer);
384
385 let mut str = String::new();
386 assert_ne!(0, reader.read_line(&mut str).await.unwrap());
387 assert_eq!("hello\n".to_string(), str);
388
389 let mut str = String::new();
390 assert_ne!(0, reader.read_line(&mut str).await.unwrap());
391 assert_eq!("world".to_string(), str);
392
393 let mut str = String::new();
394 assert_eq!(0, reader.read_line(&mut str).await.unwrap());
395 });
396 }
397
398 #[test]
399 fn bufread_lines_case() {
400 block_on(async {
401 let (mut writer, reader) = crate::async_pipe();
402 writer.write_all("hello\n".as_bytes()).await.unwrap();
403 writer.write_all("world".as_bytes()).await.unwrap();
404 drop(writer);
405
406 assert_eq!(2, reader.lines().map(|l| assert!(l.is_ok())).count().await)
407 });
408 }
409
410 #[test]
411 fn thread_writer_case() {
412 use std::io::Write;
413
414 let (writer, mut reader) = crate::async_reader_pipe();
415 for _ in 0..1000 {
416 let mut writer = writer.clone();
417 spawn(move || {
418 writer.write_all("hello".as_bytes()).unwrap();
419 });
420 }
421 drop(writer);
422
423 block_on(async {
424 let mut str = String::new();
425 reader.read_to_string(&mut str).await.unwrap();
426
427 assert_eq!("hello".repeat(1000), str);
428 })
429 }
430
431 #[test]
432 fn thread_reader_case() {
433 use std::io::Read;
434
435 let (writer, mut reader) = crate::async_writer_pipe();
436 for _ in 0..1000 {
437 let mut writer = writer.clone();
438 spawn(move || {
439 block_on(async {
440 writer.write_all("hello".as_bytes()).await.unwrap();
441 })
442 });
443 }
444 drop(writer);
445
446 let mut str = String::new();
447 reader.read_to_string(&mut str).unwrap();
448
449 assert_eq!("hello".repeat(1000), str);
450 }
451
452 #[test]
453 fn threads_write_and_read_case() {
454 let (writer, mut reader) = crate::async_pipe();
455
456 for _ in 0..1000 {
457 let mut writer = writer.clone();
458
459 spawn(move || {
460 block_on(async {
461 writer.write_all(&[0; 4]).await.unwrap();
462 })
463 });
464
465 block_on(async {
466 let mut buf = [0; 4];
467 assert_eq!(buf.len(), reader.read(&mut buf).await.unwrap());
468 })
469 }
470 drop(writer);
471 }
472}