use crate::Commands;
use crate::conn::Conn;
use crate::shard::Shard;
pub(crate) use crate::uring_conn::UringConn;
use crate::uring_conn::ParkState;
use kevy_persist::{load_snapshot, replay_aof};
use kevy_sys::Socket;
use kevy_uring::{Completion, IoUring, KernelTimespec, ProvidedBufRing};
use kevy_map::KevyMap;
use std::io;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering, fence};
use std::time::{Duration, Instant};
const URING_ENTRIES: u32 = 256;
const URING_SPIN_LIMIT: u32 = 256;
const URING_NAP_LIMIT: u32 = 64;
const PBUF_ENTRIES: u16 = 128;
const PBUF_SIZE: u32 = 16 * 1024;
const PBUF_GROUP: u16 = 0;
const ENOBUFS: i32 = 105;
pub(crate) fn io_uring_available() -> bool {
match IoUring::new(URING_ENTRIES) {
Ok(ring) => ring
.register_buf_ring(PBUF_ENTRIES, PBUF_SIZE, PBUF_GROUP)
.is_ok(),
Err(_) => false,
}
}
const OP_SHIFT: u32 = 61;
const OP_RECV: u64 = 1 << OP_SHIFT;
const OP_WRITE: u64 = 2 << OP_SHIFT;
const OP_ACCEPT: u64 = 3 << OP_SHIFT;
const OP_WAKER: u64 = 4 << OP_SHIFT;
const OP_TIMEOUT: u64 = 5 << OP_SHIFT;
const OP_ACCEPT_CL: u64 = 6 << OP_SHIFT;
const CONN_MASK: u64 = (1 << OP_SHIFT) - 1;
impl<C: Commands> Shard<C> {
pub(crate) fn run_uring(mut self, stop: Arc<AtomicBool>) -> io::Result<()> {
self.commands.on_shard_start(self.id);
let snap = self.snapshot_path();
if snap.exists()
&& let Err(e) = load_snapshot(&mut self.store, &snap)
{
eprintln!("kevy: shard {} failed to load {}: {e}", self.id, snap.display());
}
if self.aof.is_some() {
let aof_path = self.aof_path();
let commands = &self.commands;
let store = &mut self.store;
replay_aof(&aof_path, |args| {
commands.dispatch(store, &args);
})?;
}
let mut ring = IoUring::new(URING_ENTRIES)?;
let mut pbuf = ring.register_buf_ring(PBUF_ENTRIES, PBUF_SIZE, PBUF_GROUP)?;
let mut io: KevyMap<u64, UringConn> = KevyMap::new();
let mut accept_inflight = false;
let mut cl_accept_inflight = self.cluster_listener.is_none();
let mut comps: Vec<Completion> = Vec::with_capacity(URING_ENTRIES as usize);
let mut idle_spins: u32 = 0;
let mut park = ParkState::default();
let mut woke_from_park = false;
let mut tick_interval = match self.commands.shard_tick_interval_ms() {
0 => None,
ms => Some(Duration::from_millis(ms)),
};
let mut last_tick = Instant::now();
let mut tick_check_counter: u32 = 0;
let mut reap_counter: u32 = 0;
while !stop.load(Ordering::Relaxed) {
if !accept_inflight {
accept_inflight = ring.prep_accept(self.listener.raw(), OP_ACCEPT);
}
if !cl_accept_inflight
&& let Some(cl) = &self.cluster_listener
{
cl_accept_inflight = ring.prep_accept(cl.raw(), OP_ACCEPT_CL);
}
self.uring_arm_conns(&mut ring, &mut io, pbuf.group());
ring.submit_and_wait(0)?; comps.clear();
ring.for_each_completion(|c| comps.push(c));
if !comps.is_empty() {
self.store.refresh_clock();
}
let mut io_work = false;
for c in &comps {
let op = c.user_data & !CONN_MASK;
let cid = c.user_data & CONN_MASK;
match op {
OP_ACCEPT | OP_ACCEPT_CL => {
let cluster = op == OP_ACCEPT_CL;
if cluster {
cl_accept_inflight = false;
} else {
accept_inflight = false;
}
io_work = true;
if c.res >= 0 {
let sock = unsafe { Socket::from_raw_fd(c.res) };
let _ = sock.set_nodelay();
let ncid = self.next_conn_id;
self.next_conn_id += 1;
let mut conn = Conn::new(sock);
conn.cluster = cluster;
self.conns.insert(ncid, conn);
io.insert(ncid, UringConn::new());
}
}
OP_RECV => {
io_work = true;
self.uring_on_recv(cid, c, &mut io, &mut pbuf);
}
OP_WRITE => {
io_work = true;
self.uring_on_write(cid, c.res, &mut io);
}
OP_WAKER => {
park.waker_armed = false;
self.waker.drain();
}
OP_TIMEOUT => park.timeout_inflight = false,
_ => {}
}
}
let did_inbound = self.uring_drain_inbound();
self.dirty.clear();
self.flush_backlog();
self.flush_requests();
self.flush_publish();
self.flush_wakes();
if let Some(aof) = &mut self.aof {
let _ = aof.maybe_sync();
}
reap_counter = reap_counter.wrapping_add(1);
if reap_counter & 0xF == 0 {
self.uring_reap_closed(&mut io);
}
if let Some(iv) = tick_interval {
tick_check_counter = tick_check_counter.wrapping_add(1);
if tick_check_counter >= self.tick_check_every || woke_from_park {
tick_check_counter = 0;
let now = Instant::now();
self.tick_blocked_timeouts();
self.tick_xshard_timeouts();
if now.duration_since(last_tick) >= iv {
self.commands.on_shard_tick(&mut self.store);
self.apply_live_runtime_config(&mut tick_interval);
self.tick_persist();
last_tick = now;
}
}
}
woke_from_park = false;
let has_backlog = self.backlog.iter().any(|b| !b.is_empty());
if !io_work && !did_inbound && !has_backlog {
idle_spins = idle_spins.saturating_add(1);
if idle_spins >= URING_SPIN_LIMIT + URING_NAP_LIMIT {
self.uring_park(&mut ring, &mut park)?;
woke_from_park = true;
} else if idle_spins >= URING_SPIN_LIMIT {
std::thread::sleep(Duration::from_micros(200));
}
} else {
idle_spins = 0;
}
}
Ok(())
}
fn uring_park(&mut self, ring: &mut IoUring, park: &mut ParkState) -> io::Result<()> {
let me = self.id;
self.parked[me].store(true, Ordering::SeqCst);
fence(Ordering::SeqCst);
if self.uring_drain_inbound() {
self.parked[me].store(false, Ordering::SeqCst);
return Ok(());
}
if !park.waker_armed {
park.waker_armed = unsafe {
ring.prep_read(
self.waker.read_fd(),
park.wake_buf.as_mut_ptr(),
park.wake_buf.len() as u32,
OP_WAKER,
)
};
}
if !park.timeout_inflight {
park.ts = KernelTimespec::from_millis(self.park_timeout_ms.max(1) as u64);
park.timeout_inflight = unsafe { ring.prep_timeout(&park.ts, OP_TIMEOUT) };
}
if park.waker_armed || park.timeout_inflight {
ring.submit_and_wait(1)?;
}
self.parked[me].store(false, Ordering::SeqCst);
Ok(())
}
fn uring_arm_conns(
&mut self,
ring: &mut IoUring,
io: &mut KevyMap<u64, UringConn>,
bgid: u16,
) {
for (&cid, conn) in self.conns.iter_mut() {
let Some(uc) = io.get_mut(&cid) else {
continue;
};
if !uc.write_inflight && uc.write_buf.is_empty() && !conn.output.is_empty() {
std::mem::swap(&mut uc.write_buf, &mut conn.output);
uc.write_off = 0;
}
if !uc.write_inflight && uc.write_off < uc.write_buf.len() {
let ok = unsafe {
ring.prep_write(
conn.sock.raw(),
uc.write_buf.as_ptr().add(uc.write_off),
(uc.write_buf.len() - uc.write_off) as u32,
OP_WRITE | cid,
)
};
if ok {
uc.write_inflight = true;
}
}
if !uc.recv_armed
&& !uc.closing
&& ring.prep_recv_multishot(conn.sock.raw(), bgid, OP_RECV | cid)
{
uc.recv_armed = true;
}
}
}
fn uring_on_recv(
&mut self,
cid: u64,
c: &Completion,
io: &mut KevyMap<u64, UringConn>,
pbuf: &mut ProvidedBufRing,
) {
if !c.has_more()
&& let Some(uc) = io.get_mut(&cid)
{
uc.recv_armed = false;
}
if c.res <= 0 {
if c.res != -ENOBUFS {
self.uring_mark_closing(cid, io);
}
return;
}
let Some(bid) = c.buffer_id() else {
return; };
let n = c.res as usize;
if let Some(conn) = self.conns.get_mut(&cid) {
conn.input.extend_from_slice(pbuf.bytes(bid, n));
}
pbuf.recycle(bid);
let mut input_buf = match self.conns.get_mut(&cid) {
Some(c) => std::mem::take(&mut c.input),
None => return,
};
self.aof_begin_group();
let outcome = self.dispatch_batch(cid, &input_buf);
self.aof_end_group_logged();
if !outcome.conn_gone {
input_buf.drain(..outcome.consumed);
if let Some(c) = self.conns.get_mut(&cid) {
c.input = input_buf;
}
}
if outcome.conn_gone {
return;
}
if outcome.protocol_error {
self.protocol_error(cid);
self.uring_mark_closing(cid, io);
}
}
fn uring_mark_closing(&mut self, cid: u64, io: &mut KevyMap<u64, UringConn>) {
if let Some(uc) = io.get_mut(&cid) {
uc.closing = true;
}
self.blocked.drop_for_conn(cid);
self.cancel_xshard_on_close(cid);
}
fn uring_on_write(&mut self, cid: u64, res: i32, io: &mut KevyMap<u64, UringConn>) {
let Some(uc) = io.get_mut(&cid) else {
return;
};
uc.write_inflight = false;
if res < 0 {
self.uring_mark_closing(cid, io);
return;
}
uc.write_off += res as usize;
if uc.write_off >= uc.write_buf.len() {
uc.write_buf.clear();
uc.write_off = 0;
}
}
}