pipebuf_mio/tcpstream.rs
1use mio::net::TcpStream;
2use pipebuf::PBufRdWr;
3use std::io::{ErrorKind, Result};
4
5/// Exchange stream data via the `mio` [`TcpStream`] type
6///
7/// For the TCP incoming stream both TCP "close" (FIN) and "abort"
8/// (RST) are detected and passed on. For the TCP outgoing stream,
9/// "close" handling is mapped to a normal shutdown on the outgoing
10/// half of the TCP stream. The incoming half of the TCP stream, if
11/// still open, remains open until the other end closes, as expected.
12///
13/// For TCP outgoing "abort", ideally we'd generate a TCP RST to tear
14/// things down at both ends as soon as possible. This can be done
15/// with `set_linger(Some(0))` and a close. However the linger API is
16/// not yet stable on `std`, is not present at all in `mio`. So on
17/// "abort", this code does a normal shutdown on both incoming and
18/// outgoing TCP streams, and does an "abort" on the side of the pipe
19/// for incoming TCP data. This should cause rapid shutdown of things
20/// locally. The remote end however will not know that this is an
21/// abort. Linger-based handling of outgoing "abort" may be added
22/// later as a runtime option once it is stable in the APIs.
23///
24/// To start with both reading and writing via the TCP stream are
25/// paused. This is because, depending on the platform, reading or
26/// writing may give an error if a "ready" indication has not yet been
27/// received. So call `set_pause_writes(false)` or
28/// `set_pause_reads(false)` as soon as the stream indicates "ready"
29/// in order to allow data to flow.
30pub struct TcpLink {
31 // Maximum amount of data to read in one go (in bytes)
32 max_read_unit: usize,
33
34 // TCP_NODELAY flag
35 nodelay: bool,
36
37 // Set to pause writes (waiting for first "ready" indication)
38 pause_writes: bool,
39
40 // Set to pause reads (waiting for first "ready" indication)
41 pause_reads: bool,
42
43 // Pending set_nodelay()
44 pending_set_nodelay: bool,
45}
46
47impl TcpLink {
48 /// Create the component with default settings:
49 ///
50 /// - **max_read_unit** of 2048. This is bigger than a typical IP
51 /// packet's data load but you may want to increase this.
52 ///
53 /// - **nodelay** set to `false`, i.e. using the Nagle algorithm to
54 /// delay output to attempt to batch up data into fewer IP packets
55 ///
56 /// - Both reads and writes paused
57 #[inline]
58 pub fn new() -> Self {
59 Self {
60 max_read_unit: 2048,
61 nodelay: false,
62 pause_writes: true,
63 pause_reads: true,
64 pending_set_nodelay: false,
65 }
66 }
67
68 /// Change the maximum number of bytes to read in each `process`
69 /// call. This allows managing how much data you wish to handle
70 /// at a time, to allow the possibility of backpressure, and to
71 /// control how large the pipe buffers in your processing chain
72 /// will grow. If memory is not an issue, then there is no
73 /// problem with setting this large, which will likely give higher
74 /// efficiency if there is a lot of data queued.
75 #[inline]
76 pub fn set_max_read_unit(&mut self, max_read_unit: usize) {
77 self.max_read_unit = max_read_unit;
78 }
79
80 /// Change the "no delay" flag on the stream. This will be
81 /// updated on the next `process` call.
82 ///
83 /// Use `true` if you wish to disable the Nagle algorithm, e.g. if
84 /// you are sending large chunks or packets of data and wish them
85 /// to be sent immediately, even if that means using a part-filled
86 /// IP packet for the last part of the data.
87 ///
88 /// Use `false` if you are sending byte data a few bytes at a time
89 /// (e.g. terminal interaction) and you'd rather that TCP waited a
90 /// moment to attempt to batch up data together to reduce the
91 /// number of IP packets sent. For the last part of the data it
92 /// may add a delay of a network round-trip.
93 #[inline]
94 pub fn set_nodelay(&mut self, nodelay: bool) {
95 if self.nodelay != nodelay {
96 self.nodelay = nodelay;
97 self.pending_set_nodelay = true
98 }
99 }
100
101 /// Pause or unpause writes. This takes effect on the next
102 /// `process` call.
103 #[inline]
104 pub fn set_pause_writes(&mut self, pause: bool) {
105 self.pause_writes = pause;
106 }
107
108 /// Pause or unpause reads. This takes effect on the next
109 /// `process` call.
110 #[inline]
111 pub fn set_pause_reads(&mut self, pause: bool) {
112 self.pause_reads = pause;
113 }
114
115 /// Read and write as much data as possible to and from the given
116 /// TCP stream. Returns the activity status: `Ok(true)` if
117 /// something changed, `Ok(false)` if no progress could be made,
118 /// or `Err(_)` if there was a fatal error on the stream.
119 ///
120 /// Assumes that it is always called with the same TcpStream and
121 /// pipe-buffer. Things will behave unpredictably otherwise.
122 pub fn process(&mut self, stream: &mut TcpStream, mut pbuf: PBufRdWr) -> Result<bool> {
123 let rd_activity = self.process_out(stream, pbuf.reborrow())?;
124 let wr_activity = self.process_in(stream, pbuf.reborrow())?;
125 Ok(rd_activity || wr_activity)
126 }
127
128 /// Write as much data as possible out to the given TCP stream.
129 /// Returns the activity status: `Ok(true)` if something changed,
130 /// `Ok(false)` if no progress could be made, or `Err(_)` if there
131 /// was a fatal error on the stream.
132 ///
133 /// Assumes that it is always called with the same TcpStream and
134 /// pipe-buffer. Things will behave unpredictably otherwise.
135 pub fn process_out(&mut self, stream: &mut TcpStream, mut pbuf: PBufRdWr) -> Result<bool> {
136 if self.pause_writes {
137 return Ok(false);
138 }
139
140 if self.pending_set_nodelay {
141 self.pending_set_nodelay = false;
142 retry!(stream.set_nodelay(self.nodelay))?;
143 }
144
145 // TcpStream::flush() does nothing as it does write() syscalls
146 // directly (which don't buffer). So there is no need to give
147 // the option to force flushes.
148 let mut prd = pbuf.rd;
149 let trip = prd.tripwire();
150 match prd.output_to(stream, false) {
151 Err(ref e) if e.kind() == ErrorKind::WouldBlock => (),
152 Err(e) => return Err(e),
153 Ok(_) => {
154 if prd.is_empty() && prd.has_pending_eof() {
155 let shutdown = if prd.is_aborted() {
156 if !pbuf.wr.is_eof() {
157 pbuf.wr.abort();
158 }
159 std::net::Shutdown::Both
160 } else {
161 std::net::Shutdown::Write
162 };
163 match retry!(stream.shutdown(shutdown)) {
164 Err(ref e) if e.kind() == ErrorKind::WouldBlock => (),
165 Err(e) => return Err(e),
166 Ok(_) => {
167 prd.consume_eof();
168 }
169 }
170 }
171 }
172 }
173 Ok(prd.is_tripped(trip))
174 }
175
176 /// Read as much data as possible from to the given TCP stream, up
177 /// to **max_read_unit** bytes. Returns the activity status:
178 /// `Ok(true)` if something changed, `Ok(false)` if no progress
179 /// could be made, or `Err(_)` if there was a fatal error on the
180 /// stream.
181 ///
182 /// Assumes that it is always called with the same TcpStream and
183 /// pipe-buffer. Things will behave unpredictably otherwise.
184 pub fn process_in(&mut self, stream: &mut TcpStream, pbuf: PBufRdWr) -> Result<bool> {
185 let mut pwr = pbuf.wr;
186 if self.pause_reads || pwr.is_eof() {
187 return Ok(false);
188 }
189
190 let trip = pwr.tripwire();
191 if let Err(e) = pwr.input_from(stream, self.max_read_unit) {
192 match e.kind() {
193 ErrorKind::ConnectionReset | ErrorKind::ConnectionAborted => pwr.abort(),
194 ErrorKind::WouldBlock => (),
195 _ => return Err(e),
196 }
197 }
198 Ok(pwr.is_tripped(trip))
199 }
200}
201
202impl Default for TcpLink {
203 fn default() -> Self {
204 Self::new()
205 }
206}