use crate::Commands;
use crate::shard::Shard;
use crate::uring_conn::{BigArgState, UringConn};
use kevy_map::KevyMap;
impl<C: Commands> Shard<C> {
pub(crate) fn uring_on_big_arg_read(
&mut self,
cid: u64,
res: i32,
io: &mut KevyMap<u64, UringConn>,
) {
let Some(uc) = io.get_mut(&cid) else { return };
if res <= 0 {
uc.pending_big_arg = None;
uc.big_arg_read_pending = false;
uc.big_arg_rearm_recv = false;
self.uring_mark_closing(cid, io);
return;
}
let Some(state) = uc.pending_big_arg.as_mut() else { return };
let BigArgState::BareSetReading {
body,
body_len,
crlf_seen,
..
} = state.as_mut()
else {
return;
};
let n = res as usize;
let body_room = *body_len - body.len();
let body_n = n.min(body_room);
if body_n > 0 {
unsafe {
body.set_len(body.len() + body_n);
}
}
let crlf_n = ((n - body_n).min(2 - *crlf_seen as usize)) as u8;
*crlf_seen += crlf_n;
if body.len() == *body_len {
let crlf_pending_after_dispatch = 2 - *crlf_seen as usize;
if let Some(boxed) = uc.pending_big_arg.take()
&& let BigArgState::BareSetReading { key, body, body_len, .. } = *boxed
{
self.dispatch_bareset_owned(cid, key, body, body_len, io);
}
if let Some(uc) = io.get_mut(&cid) {
uc.big_arg_read_pending = false;
uc.big_arg_rearm_recv = true;
uc.pending_crlf_skip = crlf_pending_after_dispatch as u8;
}
self.mark_arm_pending(cid, io);
} else {
uc.big_arg_read_pending = true;
self.mark_arm_pending(cid, io);
}
}
pub(crate) fn uring_on_big_arg_cancel(
&mut self,
cid: u64,
_res: i32,
io: &mut KevyMap<u64, UringConn>,
) {
let Some(uc) = io.get_mut(&cid) else { return };
let Some(state) = uc.pending_big_arg.as_mut() else {
uc.big_arg_rearm_recv = true;
self.mark_arm_pending(cid, io);
return;
};
let BigArgState::BareSetCancelling {
cancel_acked,
target_canceled,
..
} = state.as_mut()
else {
return;
};
*cancel_acked = true;
if *cancel_acked && *target_canceled {
self.transition_to_reading(cid, io);
}
}
pub(crate) fn uring_on_big_arg_target_canceled(
&mut self,
cid: u64,
io: &mut KevyMap<u64, UringConn>,
) {
let Some(uc) = io.get_mut(&cid) else { return };
let Some(state) = uc.pending_big_arg.as_mut() else {
uc.big_arg_rearm_recv = true;
self.mark_arm_pending(cid, io);
return;
};
let BigArgState::BareSetCancelling {
cancel_acked,
target_canceled,
..
} = state.as_mut()
else {
return;
};
*target_canceled = true;
uc.recv_armed = false;
if *cancel_acked && *target_canceled {
self.transition_to_reading(cid, io);
}
}
pub(crate) fn transition_to_reading(
&mut self,
cid: u64,
io: &mut KevyMap<u64, UringConn>,
) {
let Some(uc) = io.get_mut(&cid) else { return };
let Some(state) = uc.pending_big_arg.take() else { return };
let BigArgState::BareSetCancelling {
key,
body,
body_len,
crlf_seen,
..
} = *state
else {
return;
};
if body.len() == body_len && crlf_seen == 2 {
self.dispatch_bareset_owned(cid, key, body, body_len, io);
if let Some(uc) = io.get_mut(&cid) {
uc.big_arg_rearm_recv = true;
}
self.mark_arm_pending(cid, io);
return;
}
uc.pending_big_arg = Some(Box::new(BigArgState::BareSetReading {
key,
body,
body_len,
crlf_seen,
}));
uc.big_arg_read_pending = true;
self.mark_arm_pending(cid, io);
}
pub(crate) fn dispatch_bareset_owned(
&mut self,
cid: u64,
key: Vec<u8>,
body: Vec<u8>,
body_len: usize,
io: &mut KevyMap<u64, UringConn>,
) {
debug_assert_eq!(body.len(), body_len);
debug_assert_eq!(body.capacity(), body_len);
let view = ThreeSliceView {
verb: b"SET",
key: &key,
body: &body,
};
if self.aof.is_some() {
self.log_write(&view);
}
if let Some(src) = self.replicate.as_mut()
&& !crate::replication_gate::is_applying_replicated()
{
src.push_mutation(&view);
}
self.maybe_notify_dispatch(&view);
self.wake_key(&key);
let _ok = self.store.set(&key, body, None, false, false);
self.store.bump_if_watched(&key);
let lua_wakes = crate::lua_wake_bridge::drain_lua_wake_buffer();
for k in lua_wakes {
self.wake_key(&k);
}
if let Some(c) = self.conns.get_mut(&cid) {
c.output.extend_from_slice(b"+OK\r\n");
}
self.mark_arm_pending(cid, io);
}
}
pub(crate) struct ThreeSliceView<'a> {
pub(crate) verb: &'a [u8],
pub(crate) key: &'a [u8],
pub(crate) body: &'a [u8],
}
impl<'a> core::ops::Index<usize> for ThreeSliceView<'a> {
type Output = [u8];
fn index(&self, i: usize) -> &[u8] {
match i {
0 => self.verb,
1 => self.key,
2 => self.body,
_ => panic!("ThreeSliceView index oob: {i}"),
}
}
}
impl<'a> kevy_resp::ArgvView for ThreeSliceView<'a> {
fn len(&self) -> usize {
3
}
fn get(&self, i: usize) -> Option<&[u8]> {
match i {
0 => Some(self.verb),
1 => Some(self.key),
2 => Some(self.body),
_ => None,
}
}
}