1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//! The cross-core drain + connection-reap half of the io_uring reactor.
//! Split out of [`crate::uring_reactor`] to keep that file under the
//! 500-LOC house rule — every method here is on the same
//! `impl<C: Commands> Shard<C>` and only ever called from `run_uring`.
use crate::Commands;
use crate::shard::Shard;
use crate::uring_reactor::UringConn;
use kevy_map::KevyMap;
impl<C: Commands> Shard<C> {
/// Drain cross-core rings: execute forwarded requests, fold replies into
/// their connection's output (no direct write — the io_uring arm/write
/// loop flushes it). The message handling itself is
/// [`Shard::drain_inbound_core`], shared with the epoll reactor.
pub(crate) fn uring_drain_inbound(&mut self) -> bool {
self.drain_inbound_core::<false>()
.expect("DIRECT_FLUSH=false drain has no fallible step")
}
/// Close connections that are done: EOF/QUIT seen, all output flushed, no
/// SQE in flight. Dropping the `Conn` closes the fd.
pub(crate) fn uring_reap_closed(&mut self, io: &mut KevyMap<u64, UringConn>) {
let done: Vec<u64> = io
.iter()
.filter(|(cid, uc)| {
let conn = self.conns.get(cid);
let drained = conn.is_none_or(|c| {
c.output.is_empty() && c.pending.is_empty() && c.write_pos == 0
});
let closing = uc.closing || conn.is_some_and(|c| c.closing);
// The multishot recv may still be armed; closing the fd (on Conn
// drop) terminates it and its final completion is ignored (conn
// gone). We only need writes fully flushed before closing.
closing && !uc.write_inflight && uc.write_buf.is_empty() && drained
})
.map(|(&cid, _)| cid)
.collect();
for cid in done {
// Use the shared teardown (not a local conns.remove): it also
// cancels block waiters (local + cross-shard arbiter) and drops
// pub/sub + pattern subscriptions. Skipping it leaked a parked
// BLPOP/XREAD waiter and psub registrations on every io_uring
// disconnect — a waiter left behind could consume a later push
// meant for a live client. The epoll-only `poller.delete` /
// `fd_to_conn` steps inside are harmless no-ops here (io_uring
// never registered the fd with the readiness poller).
self.close_conn(cid);
io.remove(&cid);
}
}
}