1pub mod ansi;
2mod convert;
3mod error;
4mod interactive;
5mod read;
6mod write;
7
8pub use convert::*;
9pub use error::*;
10pub use interactive::*;
11pub use read::*;
12
13use std::io::Error;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16pub use write::*;
17
18use crate::io::PayloadAction;
19use pin_project_lite::pin_project;
20use std::time::Duration;
21use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
22
23use super::cache::*;
24
25pin_project! {
26 #[derive(Debug)]
27 pub struct Pipe<R,W> {
28 #[pin]
29 reader: CacheReader<R>,
30 #[pin]
31 writer: W,
32 timeout: Duration,
33 block_size: usize,
34 }
35}
36
37impl<R: AsyncRead, W> PipeRead for Pipe<R, W> {
38 fn get_timeout(&self) -> Duration {
39 self.timeout
40 }
41
42 fn set_timeout(&mut self, timeout: Duration) {
43 self.timeout = timeout;
44 }
45
46 fn get_block_size(&self) -> usize {
47 self.block_size
48 }
49
50 fn set_block_size(&mut self, block_size: usize) {
51 self.block_size = block_size;
52 }
53}
54
55impl<R, W: AsyncWrite> PipeWrite for Pipe<R, W> {}
56
57impl<R, W> Pipe<R, W>
58where
59 Self: PipeRead + PipeWrite + Send,
60{
61 pub async fn payload<T: PayloadAction>(
62 &mut self,
63 payload: T,
64 ) -> Result<T::ReturnType, PipeError>
65 where
66 Self: Unpin,
67 {
68 payload.execute(self).await
69 }
70}
71
72impl<R: AsyncRead, W> AsyncCacheRead for Pipe<R, W> {
73 fn poll_reader(
74 self: Pin<&mut Self>,
75 cx: &mut Context<'_>,
76 buf: &mut ReadBuf<'_>,
77 ) -> Poll<std::io::Result<()>> {
78 self.project().reader.poll_reader(cx, buf)
79 }
80
81 fn consume(self: Pin<&mut Self>, amt: usize) {
82 self.project().reader.consume(amt)
83 }
84
85 fn restore(self: Pin<&mut Self>, data: &[u8]) {
86 self.project().reader.restore(data)
87 }
88}
89
90impl<R: AsyncRead + Unpin, W: AsyncWrite + Unpin> Pipe<R, W> {
91 const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1);
92 const DEFAULT_BLOCK_SIZE: usize = 4096;
93
94 pub fn new(reader: R, writer: W) -> Pipe<R, W> {
95 Pipe {
96 reader: CacheReader::new(reader), writer: writer,
98 block_size: Self::DEFAULT_BLOCK_SIZE,
99 timeout: Self::DEFAULT_TIMEOUT,
100 }
101 }
102}
103
104impl<R: AsyncRead, W> AsyncRead for Pipe<R, W> {
105 fn poll_read(
106 self: Pin<&mut Self>,
107 cx: &mut Context<'_>,
108 buf: &mut ReadBuf<'_>,
109 ) -> Poll<std::io::Result<()>> {
110 let this = self.project();
111 this.reader.poll_read(cx, buf)
112 }
113}
114
115impl<R, W: AsyncWrite> AsyncWrite for Pipe<R, W> {
116 fn poll_write(
117 self: Pin<&mut Self>,
118 cx: &mut Context<'_>,
119 buf: &[u8],
120 ) -> Poll<std::result::Result<usize, Error>> {
121 let this = self.project();
122 this.writer.poll_write(cx, buf)
123 }
124
125 fn poll_flush(
126 self: Pin<&mut Self>,
127 cx: &mut Context<'_>,
128 ) -> Poll<std::result::Result<(), Error>> {
129 let this = self.project();
130 this.writer.poll_flush(cx)
131 }
132
133 fn poll_shutdown(
134 self: Pin<&mut Self>,
135 cx: &mut Context<'_>,
136 ) -> Poll<std::result::Result<(), Error>> {
137 let this = self.project();
138 this.writer.poll_shutdown(cx)
139 }
140}