use super::wire::ServerWire;
use super::*;
use std::io::IoSlice;
#[cfg(feature = "embedded")]
pub(super) enum FastWriteItem {
Bytes(bytes::Bytes),
FastValue {
header: [u8; 8],
payload: bytes::Bytes,
},
RespValue {
header: [u8; RESP_HEADER_MAX_LEN],
header_len: u8,
payload: bytes::Bytes,
},
}
#[cfg(feature = "embedded")]
#[derive(Default)]
pub(crate) struct FastWriteQueue {
items: Vec<FastWriteItem>,
}
#[cfg(feature = "embedded")]
impl FastWriteQueue {
#[inline(always)]
pub(crate) fn flush_bytes(&mut self, out: &mut BytesMut) {
if !out.is_empty() {
self.items.push(FastWriteItem::Bytes(out.split().freeze()));
}
}
#[inline(always)]
pub(crate) fn push_fast_value(&mut self, out: &mut BytesMut, payload: &bytes::Bytes) {
match payload.len() {
64 => ServerWire::write_fast_value_64(out, payload.as_ref()),
len if len >= SCNP_ZERO_COPY_VALUE_THRESHOLD => {
self.flush_bytes(out);
self.items.push(FastWriteItem::FastValue {
header: ServerWire::fast_value_header(len),
payload: payload.clone(),
});
}
_ => ServerWire::write_fast_value(out, payload.as_ref()),
}
}
#[inline(always)]
pub(crate) fn push_resp_value(&mut self, out: &mut BytesMut, payload: &bytes::Bytes) {
if payload.len() < RESP_ZERO_COPY_VALUE_THRESHOLD {
ServerWire::write_resp_blob_string(out, payload.as_ref());
return;
}
self.flush_bytes(out);
let (header, header_len) = ServerWire::resp_blob_header(payload.len());
self.items.push(FastWriteItem::RespValue {
header,
header_len,
payload: payload.clone(),
});
}
#[inline(always)]
fn materialize_into(&mut self, out: &mut BytesMut) {
for item in self.items.drain(..) {
match item {
FastWriteItem::Bytes(bytes) => out.extend_from_slice(bytes.as_ref()),
FastWriteItem::FastValue { header, payload } => {
out.extend_from_slice(&header);
out.extend_from_slice(payload.as_ref());
}
FastWriteItem::RespValue {
header,
header_len,
payload,
} => {
out.extend_from_slice(&header[..header_len as usize]);
out.extend_from_slice(payload.as_ref());
out.extend_from_slice(RESP_CRLF);
}
}
}
}
#[inline(always)]
pub(super) fn is_empty(&self) -> bool {
self.items.is_empty()
}
pub(super) async fn write_pending_tokio<W>(&mut self, stream: &mut W) -> std::io::Result<()>
where
W: AsyncWrite + Unpin,
{
const MAX_WRITEV_IOVECS: usize = 1024;
while !self.items.is_empty() {
let mut batch = self.drain_iovec_batch(MAX_WRITEV_IOVECS);
if batch.len() == 1 {
match batch.pop().expect("single-item write batch") {
FastWriteItem::Bytes(bytes) => {
stream.write_all(bytes.as_ref()).await?;
continue;
}
item => batch.push(item),
}
}
FastWriteTokioBatch::write_all(stream, FastWriteTokioBatch::new(batch)).await?;
}
Ok(())
}
#[inline(always)]
pub(super) fn materialize_optional(queue: Option<&mut FastWriteQueue>, out: &mut BytesMut) {
if let Some(queue) = queue
&& !queue.is_empty()
{
queue.materialize_into(out);
}
}
#[inline(always)]
pub(super) fn drain_iovec_batch(&mut self, max_iovecs: usize) -> Vec<FastWriteItem> {
let mut take = 0usize;
let mut iovecs = 0usize;
for item in &self.items {
let item_iovecs = item.iovec_count();
if take > 0 && iovecs + item_iovecs > max_iovecs {
break;
}
take += 1;
iovecs += item_iovecs;
if iovecs >= max_iovecs {
break;
}
}
self.items.drain(..take).collect()
}
}
#[cfg(feature = "embedded")]
impl FastWriteItem {
#[inline(always)]
fn iovec_count(&self) -> usize {
match self {
FastWriteItem::Bytes(_) => 1,
FastWriteItem::FastValue { .. } => 2,
FastWriteItem::RespValue { .. } => 3,
}
}
#[inline(always)]
fn byte_len(&self) -> usize {
match self {
FastWriteItem::Bytes(bytes) => bytes.len(),
FastWriteItem::FastValue { header, payload } => header.len() + payload.len(),
FastWriteItem::RespValue {
header_len,
payload,
..
} => *header_len as usize + payload.len() + RESP_CRLF.len(),
}
}
#[inline(always)]
fn push_tokio_iovecs<'a>(&'a self, offset: usize, out: &mut Vec<IoSlice<'a>>) {
match self {
FastWriteItem::Bytes(bytes) => {
push_slice_with_offset(bytes.as_ref(), offset, out);
}
FastWriteItem::FastValue { header, payload } => {
push_two_with_offset(header.as_slice(), payload.as_ref(), offset, out);
}
FastWriteItem::RespValue {
header,
header_len,
payload,
} => {
push_three_with_offset(
&header[..*header_len as usize],
payload.as_ref(),
RESP_CRLF,
offset,
out,
);
}
}
}
}
#[cfg(feature = "embedded")]
struct FastWriteTokioBatch {
items: Vec<FastWriteItem>,
item_index: usize,
item_offset: usize,
remaining_len: usize,
}
#[cfg(feature = "embedded")]
impl FastWriteTokioBatch {
#[inline(always)]
fn new(items: Vec<FastWriteItem>) -> Self {
let remaining_len = items.iter().map(FastWriteItem::byte_len).sum();
Self {
items,
item_index: 0,
item_offset: 0,
remaining_len,
}
}
#[inline(always)]
fn remaining_len(&self) -> usize {
self.remaining_len
}
#[inline(always)]
fn fill_iovecs<'a>(&'a self, out: &mut Vec<IoSlice<'a>>, max_iovecs: usize) {
out.clear();
for (idx, item) in self.items.iter().enumerate().skip(self.item_index) {
let offset = if idx == self.item_index {
self.item_offset
} else {
0
};
item.push_tokio_iovecs(offset, out);
if out.len() >= max_iovecs {
out.truncate(max_iovecs);
break;
}
}
}
#[inline(always)]
fn consume(&mut self, mut amt: usize) {
let consumed = amt;
while amt > 0 {
let Some(item) = self.items.get(self.item_index) else {
self.remaining_len = 0;
return;
};
let item_remaining = item.byte_len().saturating_sub(self.item_offset);
match item_remaining.cmp(&amt) {
std::cmp::Ordering::Greater => {
self.item_offset += amt;
break;
}
std::cmp::Ordering::Equal => {
self.item_index += 1;
self.item_offset = 0;
break;
}
std::cmp::Ordering::Less => {
amt -= item_remaining;
self.item_index += 1;
self.item_offset = 0;
}
}
}
self.remaining_len = self.remaining_len.saturating_sub(consumed);
}
async fn write_all<W>(stream: &mut W, mut batch: FastWriteTokioBatch) -> std::io::Result<()>
where
W: AsyncWrite + Unpin,
{
const MAX_WRITEV_IOVECS: usize = 1024;
while batch.remaining_len() > 0 {
let written = {
let mut iovecs = Vec::with_capacity(MAX_WRITEV_IOVECS);
batch.fill_iovecs(&mut iovecs, MAX_WRITEV_IOVECS);
stream.write_vectored(&iovecs).await?
};
if written == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole vectored response batch",
));
}
batch.consume(written);
}
Ok(())
}
}
#[cfg(feature = "embedded")]
#[inline(always)]
fn push_slice_with_offset<'a>(slice: &'a [u8], offset: usize, out: &mut Vec<IoSlice<'a>>) {
if offset < slice.len() {
out.push(IoSlice::new(&slice[offset..]));
}
}
#[cfg(feature = "embedded")]
#[inline(always)]
fn push_two_with_offset<'a>(
first: &'a [u8],
second: &'a [u8],
offset: usize,
out: &mut Vec<IoSlice<'a>>,
) {
if offset < first.len() {
out.push(IoSlice::new(&first[offset..]));
out.push(IoSlice::new(second));
return;
}
push_slice_with_offset(second, offset - first.len(), out);
}
#[cfg(feature = "embedded")]
#[inline(always)]
fn push_three_with_offset<'a>(
first: &'a [u8],
second: &'a [u8],
third: &'a [u8],
offset: usize,
out: &mut Vec<IoSlice<'a>>,
) {
if offset < first.len() {
out.push(IoSlice::new(&first[offset..]));
out.push(IoSlice::new(second));
out.push(IoSlice::new(third));
return;
}
let offset = offset - first.len();
if offset < second.len() {
out.push(IoSlice::new(&second[offset..]));
out.push(IoSlice::new(third));
return;
}
push_slice_with_offset(third, offset - second.len(), out);
}
#[cfg(all(target_os = "linux", feature = "embedded", feature = "monoio"))]
pub(super) struct FastWriteBatchIoVec {
items: Vec<FastWriteItem>,
iovecs: Vec<libc::iovec>,
offset: usize,
remaining_len: usize,
}
#[cfg(all(target_os = "linux", feature = "embedded", feature = "monoio"))]
impl FastWriteBatchIoVec {
#[inline(always)]
pub(super) fn new(items: Vec<FastWriteItem>) -> Self {
let iovec_count = items.iter().map(FastWriteItem::iovec_count).sum();
let mut batch = Self {
items,
iovecs: Vec::with_capacity(iovec_count),
offset: 0,
remaining_len: 0,
};
for item in &batch.items {
match item {
FastWriteItem::Bytes(bytes) => {
batch.remaining_len += bytes.len();
batch.iovecs.push(libc::iovec {
iov_base: bytes.as_ptr() as *mut libc::c_void,
iov_len: bytes.len(),
});
}
FastWriteItem::FastValue { header, payload } => {
batch.remaining_len += header.len() + payload.len();
batch.iovecs.push(libc::iovec {
iov_base: header.as_ptr() as *mut libc::c_void,
iov_len: header.len(),
});
batch.iovecs.push(libc::iovec {
iov_base: payload.as_ptr() as *mut libc::c_void,
iov_len: payload.len(),
});
}
FastWriteItem::RespValue {
header,
header_len,
payload,
} => {
let header_len = *header_len as usize;
batch.remaining_len += header_len + payload.len() + RESP_CRLF.len();
batch.iovecs.push(libc::iovec {
iov_base: header.as_ptr() as *mut libc::c_void,
iov_len: header_len,
});
batch.iovecs.push(libc::iovec {
iov_base: payload.as_ptr() as *mut libc::c_void,
iov_len: payload.len(),
});
batch.iovecs.push(libc::iovec {
iov_base: RESP_CRLF.as_ptr() as *mut libc::c_void,
iov_len: RESP_CRLF.len(),
});
}
}
}
batch
}
#[inline(always)]
fn remaining_len(&self) -> usize {
self.remaining_len
}
#[inline(always)]
fn consume(&mut self, mut amt: usize) {
let consumed = amt;
while amt > 0 {
let Some(iovec) = self.iovecs.get_mut(self.offset) else {
self.remaining_len = 0;
return;
};
match iovec.iov_len.cmp(&amt) {
std::cmp::Ordering::Less => {
amt -= iovec.iov_len;
self.offset += 1;
}
std::cmp::Ordering::Equal => {
self.offset += 1;
break;
}
std::cmp::Ordering::Greater => {
iovec.iov_base = unsafe { iovec.iov_base.add(amt) };
iovec.iov_len -= amt;
break;
}
}
}
self.remaining_len = self.remaining_len.saturating_sub(consumed);
}
pub(super) async fn write_all(
stream: &mut monoio::net::TcpStream,
mut batch: FastWriteBatchIoVec,
) -> std::io::Result<()> {
use monoio::io::AsyncWriteRent;
while batch.remaining_len() > 0 {
let remaining = batch.remaining_len();
let (res, returned) = stream.writev(batch).await;
batch = returned;
let written = res?;
if written == 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole vectored response batch",
));
}
if written > remaining {
return Err(std::io::Error::other(
"writev reported more bytes than the response batch owns",
));
}
batch.consume(written);
}
Ok(())
}
}
#[cfg(all(target_os = "linux", feature = "embedded", feature = "monoio"))]
unsafe impl monoio::buf::IoVecBuf for FastWriteBatchIoVec {
fn read_iovec_ptr(&self) -> *const libc::iovec {
unsafe { self.iovecs.as_ptr().add(self.offset) }
}
fn read_iovec_len(&self) -> usize {
self.iovecs.len().saturating_sub(self.offset)
}
}