pub struct TcpStreamBuf {
    pub out: Vec<u8>,
    pub out_eof: bool,
    pub inp: Vec<u8>,
    pub inp_eof: bool,
    pub rd: usize,
    pub wr: usize,
    /* private fields */
}
Expand description

Type to aid with managing a mio::net::TcpStream along with MioPoll

First create the stream and add it to MioPoll, then pass the resulting MioSource to TcpStreamBuf::init, which allows this struct to manage the buffering. Your ready handler should call TcpStreamBuf::flush when the socket is WRITABLE, and TcpStreamBuf::read when the socket is READABLE, although you might want to delay some of the read calls using stakker::idle! if you wish to implement backpressure.

By default TCP_NODELAY is set to false on the stream. However if you’re writing large chunks of data, it is recommended to enable TCP_NODELAY by calling TcpStreamBuf::set_nodelay with true. This disables the Nagle algorithm, which means that the last packet of a large write will arrive sooner at the destination. However if you are writing very small amounts, and would benefit from the Nagle algorithm batching up data despite the extra round-trip, then it’s fine to leave it as false. Since on Windows the TCP_NODELAY flag can only be changed once the stream is writable, it is set on the first flush after each new stream is installed.

The output buffer is a simple Vec, because the pattern of behaviour is expected to be that it will generally be flushed out in its entirety. However the input buffer is a pre-filled Vec with separate read and write offsets. Data is received from the TCP stream to advance the write offset. It is expected that data will be grabbed from the read offset in the buffer by the application as soon as an entire line or entire record is available (depending on the protocol), advancing the read offset. This means it will often be necessary to leave an incomplete line or record in the buffer until more data has been read from the TCP stream to complete it. So using offsets saves some copying.

Fields§

§out: Vec<u8>

Output buffer. Append data here, and then call TcpStreamBuf::flush when ready to send. If the stream is receiving backpressure from the remote end then you’ll see data here building up.

TcpStreamBuf has a Write trait implementation which may be used to write data to this buffer. Flushing via the Write trait does not flush to the TCP end-point, though.

§out_eof: bool

Output EOF flag. When this is set to true and the out buffer fully empties in a TcpStreamBuf::flush call, the outgoing half of the stream will be shut down, which signals end-of-file. If any data is added to out after this point it will give an error from flush.

§inp: Vec<u8>

Input buffer. To receive data, read data from offset rd up to offset wr, updating the rd offset as you go. Call TcpStreamBuf::read to pull more data into the buffer, which will update wr offset and also possibly the rd offset (to drop unneeded data before rd). To apply backpressure to the remote end, use stakker::idle! for the read() call.

TcpStreamBuf has a Read trait implementation which may be used to read data from this buffer, updating the rd offset accordingly.

§inp_eof: bool

Input EOF flag. This is set by the TcpStreamBuf::read call when it returns ReadStatus::EndOfStream. The application should process the EOF only once it has finished reading any data remaining in the inp buffer.

§rd: usize

Offset for reading in input buffer

§wr: usize

Offset for writing in input buffer

Implementations§

source§

impl TcpStreamBuf

source

pub fn new() -> Self

Create a new empty TcpStreamBuf, without any stream currently associated

Examples found in repository?
examples/echo_server.rs (line 174)
165
166
167
168
169
170
171
172
173
174
175
176
177
178
    fn setup(cx: CX![], stream: TcpStream) -> std::io::Result<Self> {
        let miopoll = cx.anymap_get::<MioPoll>();
        let source = miopoll.add(
            stream,
            Interest::READABLE | Interest::WRITABLE,
            10,
            fwd_to!([cx], ready() as (Ready)),
        )?;

        let mut tcp = TcpStreamBuf::new();
        tcp.init(source);

        Ok(Self { tcp })
    }
source

pub fn init(&mut self, stream: MioSource<TcpStream>)

After adding a stream to the MioPoll instance with MioPoll::add, store the MioSource here to handle the buffering. TcpStreamBuf takes care of deregistering the stream on drop. The caller should probably call TcpStreamBuf::flush and TcpStreamBuf::read soon after this call.

Examples found in repository?
examples/echo_server.rs (line 175)
165
166
167
168
169
170
171
172
173
174
175
176
177
178
    fn setup(cx: CX![], stream: TcpStream) -> std::io::Result<Self> {
        let miopoll = cx.anymap_get::<MioPoll>();
        let source = miopoll.add(
            stream,
            Interest::READABLE | Interest::WRITABLE,
            10,
            fwd_to!([cx], ready() as (Ready)),
        )?;

        let mut tcp = TcpStreamBuf::new();
        tcp.init(source);

        Ok(Self { tcp })
    }
source

pub fn deinit(&mut self)

Discard the current stream if there is one, deregistering it from the MioPoll instance.

source

pub fn set_nodelay(&mut self, nodelay: bool)

Change the TCP_NODELAY setting for the stream. Passing true disables the Nagle algorithm. The change will be applied on the next flush.

source

pub fn pause_writes(&mut self, pause: bool)

Pause or unpause writes made by the TcpStreamBuf::flush call. If pause is set to true, then TcpStreamBuf::flush does nothing. Initially it is set to false which allows writes. This may be used on Windows where writes will fail if attempted before the first “ready for write” indication is received from mio.

source

pub fn flush(&mut self) -> Result<()>

Flush as much data as possible out to the stream

Examples found in repository?
examples/echo_server.rs (line 219)
218
219
220
221
222
223
224
225
    fn flush(&mut self, cx: CX![]) {
        if let Err(e) = self.tcp.flush() {
            fail!(cx, "Write failure on TCP stream: {}", e);
        }
        if self.tcp.out_eof && self.tcp.out.is_empty() {
            stop!(cx); // Stop actor when output is complete
        }
    }
source

pub fn inp_makespace(&mut self, max: usize)

Make space in the inp Vec to read in up to max bytes of data in addition to whatever is already there. Tries to avoid unnecessary copies.

source

pub fn read(&mut self, max: usize) -> ReadStatus

Read more data and append it to the data currently in the inp buffer. This is non-blocking. Bytes before the rd offset might be dropped from the buffer, and rd might be moved. No more than max bytes are read, which allows regulating the data input rate if that is required. If you need to apply backpressure when under load, call this method from a stakker::idle! handler. This must be called repeatedly until it returns ReadStatus::WouldBlock in order to get another READABLE ready-notification from mio.

Examples found in repository?
examples/echo_server.rs (line 183)
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
    fn ready(&mut self, cx: CX![], ready: Ready) {
        if ready.is_readable() {
            loop {
                match self.tcp.read(8192) {
                    ReadStatus::NewData => {
                        let data = self.tcp.inp[self.tcp.rd..self.tcp.wr].to_vec();
                        self.tcp.rd = self.tcp.wr;
                        self.check_special_chars(cx, &data);
                        after!(Duration::from_secs(1), [cx], send_data(data));
                        continue;
                    }
                    ReadStatus::WouldBlock => (),
                    ReadStatus::EndOfStream => {
                        after!(Duration::from_secs(1), [cx], send_eof());
                    }
                    ReadStatus::Error(e) => {
                        fail!(cx, "Read failure on TCP stream: {}", e);
                    }
                }
                break;
            }
        }

        if ready.is_writable() {
            self.flush(cx);
        }
    }
source

pub fn exchange(&mut self, upstream: &mut Self)

Transfer outgoing data to the upstream TcpStreamBuf, and pull incoming data down from the upstream TcpStreamBuf. Also passes through EOF flags both ways.

Trait Implementations§

source§

impl Default for TcpStreamBuf

source§

fn default() -> TcpStreamBuf

Returns the “default value” for a type. Read more
source§

impl Read for TcpStreamBuf

source§

fn read(&mut self, buf: &mut [u8]) -> Result<usize>

Read data from the inp buffer, advancing the rd offset. If there is no data available in inp, returns Ok(0) if the EOF has been reached, otherwise Err(ErrorKind::WouldBlock.into())

1.36.0 · source§

fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize, Error>

Like read, except that it reads into a slice of buffers. Read more
source§

fn is_read_vectored(&self) -> bool

🔬This is a nightly-only experimental API. (can_vector)
Determines if this Reader has an efficient read_vectored implementation. Read more
1.0.0 · source§

fn read_to_end(&mut self, buf: &mut Vec<u8, Global>) -> Result<usize, Error>

Read all bytes until EOF in this source, placing them into buf. Read more
1.0.0 · source§

fn read_to_string(&mut self, buf: &mut String) -> Result<usize, Error>

Read all bytes until EOF in this source, appending them to buf. Read more
1.6.0 · source§

fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), Error>

Read the exact number of bytes required to fill buf. Read more
source§

fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<(), Error>

🔬This is a nightly-only experimental API. (read_buf)
Pull some bytes from this source into the specified buffer. Read more
source§

fn read_buf_exact(&mut self, cursor: BorrowedCursor<'_>) -> Result<(), Error>

🔬This is a nightly-only experimental API. (read_buf)
Read the exact number of bytes required to fill cursor. Read more
1.0.0 · source§

fn by_ref(&mut self) -> &mut Selfwhere Self: Sized,

Creates a “by reference” adaptor for this instance of Read. Read more
1.0.0 · source§

fn bytes(self) -> Bytes<Self>where Self: Sized,

Transforms this Read instance to an Iterator over its bytes. Read more
1.0.0 · source§

fn chain<R>(self, next: R) -> Chain<Self, R>where R: Read, Self: Sized,

Creates an adapter which will chain this stream with another. Read more
1.0.0 · source§

fn take(self, limit: u64) -> Take<Self>where Self: Sized,

Creates an adapter which will read at most limit bytes from it. Read more
source§

impl Write for TcpStreamBuf

source§

fn write(&mut self, buf: &[u8]) -> Result<usize>

Write data into the out buffer

source§

fn flush(&mut self) -> Result<()>

Flush does nothing because we consider the end-target of the write to be the TcpStreamBuf::out buffer

1.36.0 · source§

fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize, Error>

Like write, except that it writes from a slice of buffers. Read more
source§

fn is_write_vectored(&self) -> bool

🔬This is a nightly-only experimental API. (can_vector)
Determines if this Writer has an efficient write_vectored implementation. Read more
1.0.0 · source§

fn write_all(&mut self, buf: &[u8]) -> Result<(), Error>

Attempts to write an entire buffer into this writer. Read more
source§

fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> Result<(), Error>

🔬This is a nightly-only experimental API. (write_all_vectored)
Attempts to write multiple buffers into this writer. Read more
1.0.0 · source§

fn write_fmt(&mut self, fmt: Arguments<'_>) -> Result<(), Error>

Writes a formatted string into this writer, returning any error encountered. Read more
1.0.0 · source§

fn by_ref(&mut self) -> &mut Selfwhere Self: Sized,

Creates a “by reference” adapter for this instance of Write. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

const: unstable · source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

const: unstable · source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
const: unstable · source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
const: unstable · source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> Any for Twhere T: Any,