use crate::Commands;
use crate::shard::Shard;
use crate::uring_bigbulk_probe::{
BigArgGenericProbe, MAX_BULK_LEN, probe_generic_bigbulk,
};
use crate::uring_conn::{BigArgState, UringConn};
use kevy_map::KevyMap;
impl<C: Commands> Shard<C> {
pub(crate) fn try_promote_bigbulk(
&mut self,
cid: u64,
tail: &[u8],
io: &mut KevyMap<u64, UringConn>,
) -> bool {
let BigArgGenericProbe::Promote { total, bytes_present } = probe_generic_bigbulk(tail)
else {
return false;
};
let Some(uc) = io.get_mut(&cid) else { return false };
if uc.pending_big_arg.is_some() {
return false;
}
if total > MAX_BULK_LEN + 1024 {
return false;
}
let take = bytes_present.min(total);
let mut frame = Vec::with_capacity(total);
frame.extend_from_slice(&tail[..take]);
if frame.len() == total {
self.uring_apply_frame_stitch(cid, frame, io);
return true;
}
uc.pending_big_arg = Some(Box::new(BigArgState { frame, total }));
true
}
pub(crate) fn uring_bigbulk_feed(
&mut self,
cid: u64,
io: &mut KevyMap<u64, UringConn>,
slab: &[u8],
) {
let Some(uc) = io.get_mut(&cid) else { return };
let Some(state) = uc.pending_big_arg.as_mut() else { return };
let need = state.total - state.frame.len();
let take = slab.len().min(need);
if take > 0 {
state.frame.extend_from_slice(&slab[..take]);
}
let total_v = state.total;
if state.frame.len() == total_v {
let state = uc.pending_big_arg.take().expect("just observed");
self.uring_apply_frame_stitch(cid, state.frame, io);
}
if take < slab.len() {
self.uring_bigbulk_feed_pipelined(cid, io, &slab[take..]);
}
}
fn uring_bigbulk_feed_pipelined(
&mut self,
cid: u64,
io: &mut KevyMap<u64, UringConn>,
extra: &[u8],
) {
let mut input_buf = match self.conns.get_mut(&cid) {
Some(c) => std::mem::take(&mut c.input),
None => return,
};
let outcome = self.uring_recv_dispatch(cid, extra, &mut input_buf, io);
if outcome.conn_gone {
return;
}
if let Some(c) = self.conns.get_mut(&cid) {
c.input = input_buf;
}
if outcome.protocol_error {
self.protocol_error(cid);
self.uring_mark_closing(cid, io);
}
}
fn uring_apply_frame_stitch(
&mut self,
cid: u64,
frame: Vec<u8>,
io: &mut KevyMap<u64, UringConn>,
) {
let outcome = self.dispatch_batch(cid, &frame);
if outcome.conn_gone {
return;
}
if outcome.protocol_error {
self.protocol_error(cid);
self.uring_mark_closing(cid, io);
return;
}
debug_assert_eq!(
outcome.consumed,
frame.len(),
"frame-stitch: parser consumed != probe total"
);
}
}
#[cfg(test)]
mod tests {
}