use crate::Commands;
use crate::shard::Shard;
use crate::uring_bigbulk_probe::{
BigArgGenericProbe, MAX_BULK_LEN, probe_generic_bigbulk,
};
use crate::uring_conn::{BigArgState, UringConn};
use kevy_map::KevyMap;
impl<C: Commands> Shard<C> {
pub(crate) fn try_promote_bigbulk(
&mut self,
cid: u64,
tail: &[u8],
io: &mut KevyMap<u64, UringConn>,
) -> bool {
let BigArgGenericProbe::Promote {
total,
bytes_present,
body_start_in_tail,
body_len,
bare_set_key_range,
} = probe_generic_bigbulk(tail)
else {
return false;
};
let Some(uc) = io.get_mut(&cid) else { return false };
if uc.pending_big_arg.is_some() {
return false;
}
if total > MAX_BULK_LEN + 1024 {
return false;
}
if let Some((k_start, k_end)) = bare_set_key_range {
let key = tail[k_start..k_end].to_vec();
if self.shard_of(&key) == self.id {
let mut body = Vec::with_capacity(body_len);
let body_in_slab =
bytes_present.saturating_sub(body_start_in_tail).min(body_len);
body.extend_from_slice(
&tail[body_start_in_tail..body_start_in_tail + body_in_slab],
);
let crlf_in_slab = bytes_present
.saturating_sub(body_start_in_tail + body_in_slab)
.min(2);
if body.len() == body_len && crlf_in_slab == 2 {
self.dispatch_bareset_owned(cid, key, body, body_len, io);
return true;
}
let Some(uc) = io.get_mut(&cid) else { return false };
uc.pending_big_arg = Some(Box::new(BigArgState::BareSetCancelling {
key,
body,
body_len,
crlf_seen: crlf_in_slab as u8,
cancel_acked: false,
target_canceled: false,
}));
uc.big_arg_cancel_pending = true;
self.mark_arm_pending(cid, io);
return true;
}
}
self.install_frame_state(cid, total, bytes_present, tail, io)
}
fn install_frame_state(
&mut self,
cid: u64,
total: usize,
bytes_present: usize,
tail: &[u8],
io: &mut KevyMap<u64, UringConn>,
) -> bool {
let take = bytes_present.min(total);
let mut frame = Vec::with_capacity(total);
frame.extend_from_slice(&tail[..take]);
if frame.len() == total {
self.uring_apply_frame_stitch(cid, frame, io);
return true;
}
let Some(uc) = io.get_mut(&cid) else { return false };
uc.pending_big_arg = Some(Box::new(BigArgState::Frame { frame, total }));
true
}
pub(crate) fn uring_bigbulk_feed(
&mut self,
cid: u64,
io: &mut KevyMap<u64, UringConn>,
slab: &[u8],
) {
let Some(uc) = io.get_mut(&cid) else { return };
let Some(state) = uc.pending_big_arg.as_mut() else { return };
let take = match state.as_mut() {
BigArgState::Frame { frame, total } => {
let need = *total - frame.len();
let t = slab.len().min(need);
if t > 0 {
frame.extend_from_slice(&slab[..t]);
}
if frame.len() == *total {
if let Some(boxed) = uc.pending_big_arg.take()
&& let BigArgState::Frame { frame, .. } = *boxed
{
self.uring_apply_frame_stitch(cid, frame, io);
}
}
t
}
BigArgState::BareSetCancelling {
body,
body_len,
crlf_seen,
..
} => {
let take_body = (*body_len - body.len()).min(slab.len());
if take_body > 0 {
body.extend_from_slice(&slab[..take_body]);
}
let crlf_pending = 2 - *crlf_seen as usize;
let take_crlf = crlf_pending.min(slab.len() - take_body);
*crlf_seen += take_crlf as u8;
let t = take_body + take_crlf;
let complete = body.len() == *body_len && *crlf_seen == 2;
if complete {
uc.big_arg_cancel_pending = false;
if let Some(boxed) = uc.pending_big_arg.take()
&& let BigArgState::BareSetCancelling { key, body, body_len, .. } = *boxed
{
self.dispatch_bareset_owned(cid, key, body, body_len, io);
}
}
t
}
BigArgState::BareSetReading { .. } => {
0
}
};
if take < slab.len() {
self.uring_bigbulk_feed_pipelined(cid, io, &slab[take..]);
}
}
fn uring_bigbulk_feed_pipelined(
&mut self,
cid: u64,
io: &mut KevyMap<u64, UringConn>,
extra: &[u8],
) {
let mut input_buf = match self.conns.get_mut(&cid) {
Some(c) => std::mem::take(&mut c.input),
None => return,
};
let outcome = self.uring_recv_dispatch(cid, extra, &mut input_buf, io);
if outcome.conn_gone {
return;
}
if let Some(c) = self.conns.get_mut(&cid) {
c.input = input_buf;
}
if outcome.protocol_error {
self.protocol_error(cid);
self.uring_mark_closing(cid, io);
}
}
fn uring_apply_frame_stitch(
&mut self,
cid: u64,
frame: Vec<u8>,
io: &mut KevyMap<u64, UringConn>,
) {
let outcome = self.dispatch_batch(cid, &frame);
if outcome.conn_gone {
return;
}
if outcome.protocol_error {
self.protocol_error(cid);
self.uring_mark_closing(cid, io);
return;
}
debug_assert_eq!(
outcome.consumed,
frame.len(),
"frame-stitch: parser consumed != probe total"
);
}
}
#[cfg(test)]
mod tests {
}