pipebuf_mio/
tcpstream.rs

1use mio::net::TcpStream;
2use pipebuf::PBufRdWr;
3use std::io::{ErrorKind, Result};
4
5/// Exchange stream data via the `mio` [`TcpStream`] type
6///
7/// For the TCP incoming stream both TCP "close" (FIN) and "abort"
8/// (RST) are detected and passed on.  For the TCP outgoing stream,
9/// "close" handling is mapped to a normal shutdown on the outgoing
10/// half of the TCP stream.  The incoming half of the TCP stream, if
11/// still open, remains open until the other end closes, as expected.
12///
13/// For TCP outgoing "abort", ideally we'd generate a TCP RST to tear
14/// things down at both ends as soon as possible.  This can be done
15/// with `set_linger(Some(0))` and a close.  However the linger API is
16/// not yet stable on `std`, is not present at all in `mio`.  So on
17/// "abort", this code does a normal shutdown on both incoming and
18/// outgoing TCP streams, and does an "abort" on the side of the pipe
19/// for incoming TCP data.  This should cause rapid shutdown of things
20/// locally.  The remote end however will not know that this is an
21/// abort.  Linger-based handling of outgoing "abort" may be added
22/// later as a runtime option once it is stable in the APIs.
23///
24/// To start with both reading and writing via the TCP stream are
25/// paused.  This is because, depending on the platform, reading or
26/// writing may give an error if a "ready" indication has not yet been
27/// received.  So call `set_pause_writes(false)` or
28/// `set_pause_reads(false)` as soon as the stream indicates "ready"
29/// in order to allow data to flow.
30pub struct TcpLink {
31    // Maximum amount of data to read in one go (in bytes)
32    max_read_unit: usize,
33
34    // TCP_NODELAY flag
35    nodelay: bool,
36
37    // Set to pause writes (waiting for first "ready" indication)
38    pause_writes: bool,
39
40    // Set to pause reads (waiting for first "ready" indication)
41    pause_reads: bool,
42
43    // Pending set_nodelay()
44    pending_set_nodelay: bool,
45}
46
47impl TcpLink {
48    /// Create the component with default settings:
49    ///
50    /// - **max_read_unit** of 2048.  This is bigger than a typical IP
51    /// packet's data load but you may want to increase this.
52    ///
53    /// - **nodelay** set to `false`, i.e. using the Nagle algorithm to
54    /// delay output to attempt to batch up data into fewer IP packets
55    ///
56    /// - Both reads and writes paused
57    #[inline]
58    pub fn new() -> Self {
59        Self {
60            max_read_unit: 2048,
61            nodelay: false,
62            pause_writes: true,
63            pause_reads: true,
64            pending_set_nodelay: false,
65        }
66    }
67
68    /// Change the maximum number of bytes to read in each `process`
69    /// call.  This allows managing how much data you wish to handle
70    /// at a time, to allow the possibility of backpressure, and to
71    /// control how large the pipe buffers in your processing chain
72    /// will grow.  If memory is not an issue, then there is no
73    /// problem with setting this large, which will likely give higher
74    /// efficiency if there is a lot of data queued.
75    #[inline]
76    pub fn set_max_read_unit(&mut self, max_read_unit: usize) {
77        self.max_read_unit = max_read_unit;
78    }
79
80    /// Change the "no delay" flag on the stream.  This will be
81    /// updated on the next `process` call.
82    ///
83    /// Use `true` if you wish to disable the Nagle algorithm, e.g. if
84    /// you are sending large chunks or packets of data and wish them
85    /// to be sent immediately, even if that means using a part-filled
86    /// IP packet for the last part of the data.
87    ///
88    /// Use `false` if you are sending byte data a few bytes at a time
89    /// (e.g. terminal interaction) and you'd rather that TCP waited a
90    /// moment to attempt to batch up data together to reduce the
91    /// number of IP packets sent.  For the last part of the data it
92    /// may add a delay of a network round-trip.
93    #[inline]
94    pub fn set_nodelay(&mut self, nodelay: bool) {
95        if self.nodelay != nodelay {
96            self.nodelay = nodelay;
97            self.pending_set_nodelay = true
98        }
99    }
100
101    /// Pause or unpause writes.  This takes effect on the next
102    /// `process` call.
103    #[inline]
104    pub fn set_pause_writes(&mut self, pause: bool) {
105        self.pause_writes = pause;
106    }
107
108    /// Pause or unpause reads.  This takes effect on the next
109    /// `process` call.
110    #[inline]
111    pub fn set_pause_reads(&mut self, pause: bool) {
112        self.pause_reads = pause;
113    }
114
115    /// Read and write as much data as possible to and from the given
116    /// TCP stream.  Returns the activity status: `Ok(true)` if
117    /// something changed, `Ok(false)` if no progress could be made,
118    /// or `Err(_)` if there was a fatal error on the stream.
119    ///
120    /// Assumes that it is always called with the same TcpStream and
121    /// pipe-buffer.  Things will behave unpredictably otherwise.
122    pub fn process(&mut self, stream: &mut TcpStream, mut pbuf: PBufRdWr) -> Result<bool> {
123        let rd_activity = self.process_out(stream, pbuf.reborrow())?;
124        let wr_activity = self.process_in(stream, pbuf.reborrow())?;
125        Ok(rd_activity || wr_activity)
126    }
127
128    /// Write as much data as possible out to the given TCP stream.
129    /// Returns the activity status: `Ok(true)` if something changed,
130    /// `Ok(false)` if no progress could be made, or `Err(_)` if there
131    /// was a fatal error on the stream.
132    ///
133    /// Assumes that it is always called with the same TcpStream and
134    /// pipe-buffer.  Things will behave unpredictably otherwise.
135    pub fn process_out(&mut self, stream: &mut TcpStream, mut pbuf: PBufRdWr) -> Result<bool> {
136        if self.pause_writes {
137            return Ok(false);
138        }
139
140        if self.pending_set_nodelay {
141            self.pending_set_nodelay = false;
142            retry!(stream.set_nodelay(self.nodelay))?;
143        }
144
145        // TcpStream::flush() does nothing as it does write() syscalls
146        // directly (which don't buffer).  So there is no need to give
147        // the option to force flushes.
148        let mut prd = pbuf.rd;
149        let trip = prd.tripwire();
150        match prd.output_to(stream, false) {
151            Err(ref e) if e.kind() == ErrorKind::WouldBlock => (),
152            Err(e) => return Err(e),
153            Ok(_) => {
154                if prd.is_empty() && prd.has_pending_eof() {
155                    let shutdown = if prd.is_aborted() {
156                        if !pbuf.wr.is_eof() {
157                            pbuf.wr.abort();
158                        }
159                        std::net::Shutdown::Both
160                    } else {
161                        std::net::Shutdown::Write
162                    };
163                    match retry!(stream.shutdown(shutdown)) {
164                        Err(ref e) if e.kind() == ErrorKind::WouldBlock => (),
165                        Err(e) => return Err(e),
166                        Ok(_) => {
167                            prd.consume_eof();
168                        }
169                    }
170                }
171            }
172        }
173        Ok(prd.is_tripped(trip))
174    }
175
176    /// Read as much data as possible from to the given TCP stream, up
177    /// to **max_read_unit** bytes.  Returns the activity status:
178    /// `Ok(true)` if something changed, `Ok(false)` if no progress
179    /// could be made, or `Err(_)` if there was a fatal error on the
180    /// stream.
181    ///
182    /// Assumes that it is always called with the same TcpStream and
183    /// pipe-buffer.  Things will behave unpredictably otherwise.
184    pub fn process_in(&mut self, stream: &mut TcpStream, pbuf: PBufRdWr) -> Result<bool> {
185        let mut pwr = pbuf.wr;
186        if self.pause_reads || pwr.is_eof() {
187            return Ok(false);
188        }
189
190        let trip = pwr.tripwire();
191        if let Err(e) = pwr.input_from(stream, self.max_read_unit) {
192            match e.kind() {
193                ErrorKind::ConnectionReset | ErrorKind::ConnectionAborted => pwr.abort(),
194                ErrorKind::WouldBlock => (),
195                _ => return Err(e),
196            }
197        }
198        Ok(pwr.is_tripped(trip))
199    }
200}
201
202impl Default for TcpLink {
203    fn default() -> Self {
204        Self::new()
205    }
206}