Struct stakker_mio::TcpStreamBuf
source · pub struct TcpStreamBuf {
pub out: Vec<u8>,
pub out_eof: bool,
pub inp: Vec<u8>,
pub inp_eof: bool,
pub rd: usize,
pub wr: usize,
/* private fields */
}
Expand description
Type to aid with managing a mio::net::TcpStream
along with
MioPoll
First create the stream and add it to MioPoll
, then pass the
resulting MioSource
to TcpStreamBuf::init
, which allows
this struct to manage the buffering. Your ready handler should
call TcpStreamBuf::flush
when the socket is WRITABLE, and
TcpStreamBuf::read
when the socket is READABLE, although you
might want to delay some of the read calls using
stakker::idle!
if you wish to implement backpressure.
By default TCP_NODELAY is set to false
on the stream. However
if you’re writing large chunks of data, it is recommended to
enable TCP_NODELAY by calling TcpStreamBuf::set_nodelay
with
true
. This disables the Nagle algorithm, which means that the
last packet of a large write will arrive sooner at the
destination. However if you are writing very small amounts, and
would benefit from the Nagle algorithm batching up data despite
the extra round-trip, then it’s fine to leave it as false
.
Since on Windows the TCP_NODELAY flag can only be changed once the
stream is writable, it is set on the first flush after each new
stream is installed.
The output buffer is a simple Vec
, because the pattern of
behaviour is expected to be that it will generally be flushed out
in its entirety. However the input buffer is a pre-filled Vec
with separate read and write offsets. Data is received from the
TCP stream to advance the write offset. It is expected that data
will be grabbed from the read offset in the buffer by the
application as soon as an entire line or entire record is
available (depending on the protocol), advancing the read offset.
This means it will often be necessary to leave an incomplete line
or record in the buffer until more data has been read from the TCP
stream to complete it. So using offsets saves some copying.
Fields§
§out: Vec<u8>
Output buffer. Append data here, and then call
TcpStreamBuf::flush
when ready to send. If the stream is
receiving backpressure from the remote end then you’ll see
data here building up.
TcpStreamBuf
has a Write
trait implementation which may be
used to write data to this buffer. Flushing via the Write
trait does not flush to the TCP end-point, though.
out_eof: bool
Output EOF flag. When this is set to true
and the out
buffer fully empties in a TcpStreamBuf::flush
call, the
outgoing half of the stream will be shut down, which signals
end-of-file. If any data is added to out
after this point
it will give an error from flush
.
inp: Vec<u8>
Input buffer. To receive data, read data from offset rd
up
to offset wr
, updating the rd
offset as you go. Call
TcpStreamBuf::read
to pull more data into the buffer,
which will update wr
offset and also possibly the rd
offset (to drop unneeded data before rd
). To apply
backpressure to the remote end, use stakker::idle!
for the
read()
call.
TcpStreamBuf
has a Read
trait implementation which may be
used to read data from this buffer, updating the rd
offset
accordingly.
inp_eof: bool
Input EOF flag. This is set by the TcpStreamBuf::read
call when it returns ReadStatus::EndOfStream
. The
application should process the EOF only once it has finished
reading any data remaining in the inp
buffer.
rd: usize
Offset for reading in input buffer
wr: usize
Offset for writing in input buffer
Implementations§
source§impl TcpStreamBuf
impl TcpStreamBuf
sourcepub fn new() -> Self
pub fn new() -> Self
Create a new empty TcpStreamBuf, without any stream currently associated
Examples found in repository?
165 166 167 168 169 170 171 172 173 174 175 176 177 178
fn setup(cx: CX![], stream: TcpStream) -> std::io::Result<Self> {
let miopoll = cx.anymap_get::<MioPoll>();
let source = miopoll.add(
stream,
Interest::READABLE | Interest::WRITABLE,
10,
fwd_to!([cx], ready() as (Ready)),
)?;
let mut tcp = TcpStreamBuf::new();
tcp.init(source);
Ok(Self { tcp })
}
sourcepub fn init(&mut self, stream: MioSource<TcpStream>)
pub fn init(&mut self, stream: MioSource<TcpStream>)
After adding a stream to the MioPoll instance with
MioPoll::add
, store the MioSource
here to handle the
buffering. TcpStreamBuf
takes care of deregistering the
stream on drop. The caller should probably call
TcpStreamBuf::flush
and TcpStreamBuf::read
soon after
this call.
Examples found in repository?
165 166 167 168 169 170 171 172 173 174 175 176 177 178
fn setup(cx: CX![], stream: TcpStream) -> std::io::Result<Self> {
let miopoll = cx.anymap_get::<MioPoll>();
let source = miopoll.add(
stream,
Interest::READABLE | Interest::WRITABLE,
10,
fwd_to!([cx], ready() as (Ready)),
)?;
let mut tcp = TcpStreamBuf::new();
tcp.init(source);
Ok(Self { tcp })
}
sourcepub fn deinit(&mut self)
pub fn deinit(&mut self)
Discard the current stream if there is one, deregistering it
from the MioPoll
instance.
sourcepub fn set_nodelay(&mut self, nodelay: bool)
pub fn set_nodelay(&mut self, nodelay: bool)
Change the TCP_NODELAY setting for the stream. Passing true
disables the Nagle algorithm. The change will be applied on
the next flush.
sourcepub fn pause_writes(&mut self, pause: bool)
pub fn pause_writes(&mut self, pause: bool)
Pause or unpause writes made by the TcpStreamBuf::flush
call. If pause
is set to true
, then
TcpStreamBuf::flush
does nothing. Initially it is set to
false
which allows writes. This may be used on Windows
where writes will fail if attempted before the first “ready
for write” indication is received from mio
.
sourcepub fn inp_makespace(&mut self, max: usize)
pub fn inp_makespace(&mut self, max: usize)
Make space in the inp
Vec to read in up to max
bytes of
data in addition to whatever is already there. Tries to avoid
unnecessary copies.
sourcepub fn read(&mut self, max: usize) -> ReadStatus
pub fn read(&mut self, max: usize) -> ReadStatus
Read more data and append it to the data currently in the
inp
buffer. This is non-blocking. Bytes before the rd
offset might be dropped from the buffer, and rd
might be
moved. No more than max
bytes are read, which allows
regulating the data input rate if that is required. If you
need to apply backpressure when under load, call this method
from a stakker::idle!
handler. This must be called
repeatedly until it returns ReadStatus::WouldBlock
in order
to get another READABLE ready-notification from mio
.
Examples found in repository?
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
fn ready(&mut self, cx: CX![], ready: Ready) {
if ready.is_readable() {
loop {
match self.tcp.read(8192) {
ReadStatus::NewData => {
let data = self.tcp.inp[self.tcp.rd..self.tcp.wr].to_vec();
self.tcp.rd = self.tcp.wr;
self.check_special_chars(cx, &data);
after!(Duration::from_secs(1), [cx], send_data(data));
continue;
}
ReadStatus::WouldBlock => (),
ReadStatus::EndOfStream => {
after!(Duration::from_secs(1), [cx], send_eof());
}
ReadStatus::Error(e) => {
fail!(cx, "Read failure on TCP stream: {}", e);
}
}
break;
}
}
if ready.is_writable() {
self.flush(cx);
}
}
Trait Implementations§
source§impl Default for TcpStreamBuf
impl Default for TcpStreamBuf
source§impl Read for TcpStreamBuf
impl Read for TcpStreamBuf
source§fn read(&mut self, buf: &mut [u8]) -> Result<usize>
fn read(&mut self, buf: &mut [u8]) -> Result<usize>
Read data from the inp
buffer, advancing the rd
offset.
If there is no data available in inp
, returns Ok(0)
if the
EOF has been reached, otherwise
Err(ErrorKind::WouldBlock.into())
1.36.0 · source§fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize, Error>
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize, Error>
read
, except that it reads into a slice of buffers. Read moresource§fn is_read_vectored(&self) -> bool
fn is_read_vectored(&self) -> bool
can_vector
)1.0.0 · source§fn read_to_end(&mut self, buf: &mut Vec<u8, Global>) -> Result<usize, Error>
fn read_to_end(&mut self, buf: &mut Vec<u8, Global>) -> Result<usize, Error>
buf
. Read more1.0.0 · source§fn read_to_string(&mut self, buf: &mut String) -> Result<usize, Error>
fn read_to_string(&mut self, buf: &mut String) -> Result<usize, Error>
buf
. Read more1.6.0 · source§fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), Error>
fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), Error>
buf
. Read moresource§fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<(), Error>
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<(), Error>
read_buf
)source§fn read_buf_exact(&mut self, cursor: BorrowedCursor<'_>) -> Result<(), Error>
fn read_buf_exact(&mut self, cursor: BorrowedCursor<'_>) -> Result<(), Error>
read_buf
)cursor
. Read more1.0.0 · source§fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
fn by_ref(&mut self) -> &mut Selfwhere Self: Sized,
Read
. Read moresource§impl Write for TcpStreamBuf
impl Write for TcpStreamBuf
source§fn flush(&mut self) -> Result<()>
fn flush(&mut self) -> Result<()>
Flush does nothing because we consider the end-target of the
write to be the TcpStreamBuf::out
buffer
source§fn is_write_vectored(&self) -> bool
fn is_write_vectored(&self) -> bool
can_vector
)1.0.0 · source§fn write_all(&mut self, buf: &[u8]) -> Result<(), Error>
fn write_all(&mut self, buf: &[u8]) -> Result<(), Error>
source§fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> Result<(), Error>
fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> Result<(), Error>
write_all_vectored
)