1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
//! The outbound half of the shard transport: cross-core sends (ring push +
//! backlog spill + coalesced wakeups) and connection output flushing. Split
//! out of [`crate::shard`] to keep that file under the 500-LOC house rule —
//! every method here is on the same `impl<C: Commands> Shard<C>`.
use crate::Commands;
use crate::message::Inbound;
use crate::shard::Shard;
use std::io;
use std::sync::atomic::{Ordering, fence};
impl<C: Commands> Shard<C> {
/// Wake every target enqueued to this iteration that is currently parked.
/// A spinning peer needs no syscall — it will see the message on its next
/// poll(0). This is what removes the per-message wakeup under load.
#[inline]
pub(crate) fn flush_wakes(&mut self) {
// Hot path: short-circuit on the bitmap. Replaces the previous
// `Vec<bool>::iter().any(|&w| w)`, which was N byte loads per
// iter — the early-return alone showed up as 2.6 % of -c1 CPU
// in the 2026-06-20 profile.
if self.pending_wakes == 0 {
return;
}
// Close the park/wake race: the SeqCst fence pairs with the
// matching fence in `Shard::run` after a peer stores `parked=true`.
// Combined, they guarantee: if our ring push (Release on the
// outbox's tail, executed earlier this iteration via `send_to`)
// happens-before this load, AND the peer's parked-store
// happens-before its post-park drain, then either
// (a) the peer's drain sees our push, OR
// (b) our load sees `parked=true` and we send the wake.
// Loom-verified by `kevy-rt/tests/loom.rs::no_wake_implies_drained`.
// Without the fence the lost-wake window was bounded by the
// peer's `PARK_TIMEOUT_MS` (50 ms); the timeout remains as
// defense-in-depth against missed eventfd writes / OS hiccups.
fence(Ordering::SeqCst);
let mut mask = self.pending_wakes;
self.pending_wakes = 0;
while mask != 0 {
let i = mask.trailing_zeros() as usize;
mask &= mask - 1;
if self.parked[i].load(Ordering::SeqCst) {
let _ = self.wakers[i].wake();
}
}
}
/// Flush connections a PUBLISH appended output to this iteration (epoll path;
/// the io_uring reactor flushes them via its arm/write loop instead).
#[inline]
pub(crate) fn flush_dirty(&mut self) -> io::Result<()> {
if self.dirty.is_empty() {
return Ok(());
}
while let Some(id) = self.dirty.pop() {
self.flush_conn(id)?;
}
Ok(())
}
/// Enqueue a message to another shard, marking it for a coalesced wakeup. The
/// fast path is a lock-free ring push; on a full ring it spills to the local
/// per-target backlog (preserving order), which `flush_backlog` drains later.
pub(crate) fn send_to(&mut self, dst: usize, msg: Inbound) {
let bit = 1u64 << dst;
if self.backlog_nonempty & bit == 0 {
match self.outboxes[dst].as_mut() {
Some(p) => {
if let Err(m) = p.push(msg) {
self.backlog[dst].push_back(m);
self.backlog_nonempty |= bit;
}
}
// `dst == self.id` has no ring and is never sent to.
None => return,
}
} else {
// Order: queue behind the existing backlog rather than jumping the ring.
self.backlog[dst].push_back(msg);
}
// Tell `dst`'s reactor it has incoming work from us. Release pairs
// with the AcqRel swap in `drain_inbound_core` — anything our push
// wrote into the ring is visible to the drain that observes our bit.
self.inbound_dirty[dst].fetch_or(1u64 << self.id, Ordering::Release);
self.pending_wakes |= bit;
}
/// Re-push each per-target backlog into its ring (filled when a ring was full
/// last iteration). Stops at the first target whose ring is still full.
#[inline]
pub(crate) fn flush_backlog(&mut self) {
// Hot path: short-circuit on the bitmap. Replaces the previous
// `backlog.iter().all(VecDeque::is_empty)`, which was N struct
// accesses per iter — the early-return alone showed up as 2.5 %
// of -c1 CPU in the 2026-06-20 profile.
if self.backlog_nonempty == 0 {
return;
}
let mut mask = self.backlog_nonempty;
while mask != 0 {
let dst = mask.trailing_zeros() as usize;
mask &= mask - 1;
let Some(p) = self.outboxes[dst].as_mut() else {
self.backlog[dst].clear();
self.backlog_nonempty &= !(1u64 << dst);
continue;
};
while let Some(msg) = self.backlog[dst].pop_front() {
if let Err(m) = p.push(msg) {
self.backlog[dst].push_front(m);
// Still non-empty — leave the bit set for next iter.
break;
}
self.pending_wakes |= 1u64 << dst;
}
if self.backlog[dst].is_empty() {
self.backlog_nonempty &= !(1u64 << dst);
}
}
}
/// Write a connection's staged output to its socket: drain until done or
/// WouldBlock, drop the conn once closing + fully drained, and keep the
/// poller's write-interest in sync with whether output remains.
pub(crate) fn flush_conn(&mut self, conn_id: u64) -> io::Result<()> {
let (close, want_write, fd) = {
let Some(conn) = self.conns.get_mut(&conn_id) else {
return Ok(());
};
while conn.write_pos < conn.output.len() {
match conn.sock.write(&conn.output[conn.write_pos..]) {
Ok(0) => break,
Ok(n) => conn.write_pos += n,
Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
Err(e) if e.kind() == io::ErrorKind::Interrupted => {} // retry the write
Err(_) => {
conn.closing = true;
break;
}
}
}
if conn.write_pos == conn.output.len() {
conn.output.clear();
conn.write_pos = 0;
}
let out_remaining = conn.write_pos < conn.output.len();
let close = conn.closing && conn.pending.is_empty() && !out_remaining;
(close, out_remaining, conn.sock.raw())
};
if close {
self.close_conn(conn_id);
return Ok(());
}
if let Some(conn) = self.conns.get_mut(&conn_id)
&& want_write != conn.want_write
{
conn.want_write = want_write;
self.poller.modify(fd, true, want_write)?;
}
Ok(())
}
}