1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
//! Per-connection state owned by its origin shard.
use crate::message::PendingSlot;
use kevy_resp::{Argv, RespVersion};
use kevy_sys::Socket;
use std::collections::{HashSet, VecDeque};
use std::sync::Arc;
/// Per-connection state owned by its origin shard.
///
/// A4 (2026-06-20): `#[repr(C)]` + hot-first field order. The H7 +
/// post-v1.24 diagnostics both showed L1D-miss in the busy-poll
/// reactor body. Per-request work touches `sock` (raw fd for write
/// SQE), `input`/`output` (recv → parse → reply), `pending` (push +
/// pop + front), `write_pos`, `next_seq`, `next_emit`, plus the five
/// boolean / enum flags. The four collection fields (`sub`, `psub`,
/// `multi`, `watched`) are PUBLISH / MULTI / WATCH only — touched
/// zero times on the steady-state GET/SET loop.
///
/// Layout under `#[repr(C)]`:
///
/// ```text
/// offset bytes field
/// 0 4 sock (raw fd)
/// 8 24 input (Vec ptr + len + cap)
/// 32 24 output (Vec ptr + len + cap)
/// 56 32 pending (VecDeque<PendingSlot>)
/// 88 8 write_pos
/// 96 8 next_seq
/// 104 8 next_emit
/// 112 1 proto (enum tag)
/// 113 1 closing
/// 114 1 want_write
/// 115 1 blocked
/// 116 1 cluster
/// 117 1 pending_write
/// ──── ~120 B = 2 cache lines worth of hot state ────
/// COLD
/// ? ? sub, psub, multi, watched
/// ```
///
/// Without `#[repr(C)]`, the default `repr(Rust)` layout reorders for
/// alignment + size and intersperses cold collections among the hot
/// flags. With this attribute the hot fields are guaranteed
/// contiguous at the front of the struct, fitting two cache lines.
#[repr(C)]
pub(crate) struct Conn {
// ── HOT (~2 cache lines, touched on every recv/dispatch/write iter) ──
pub(crate) sock: Socket,
pub(crate) input: Vec<u8>,
pub(crate) output: Vec<u8>,
/// L1 (2026-06-21): Arc-backed value bytes scheduled to be `writev`-sent
/// alongside `output`. Each `(pos, arc)` means "insert `arc.as_ref()`
/// after byte `pos` in `output` when building the iovec list for the
/// reactor's writev SQE". Sorted by `pos`; pushed by `encode_bulk_arc`
/// at the bulk-reply emit site. Empty in the common small-reply path,
/// so the reactor stays on `prep_write` with no overhead.
pub(crate) output_arcs: Vec<(usize, Arc<Box<[u8]>>)>,
/// Outstanding commands in seq order; front == `next_emit`. An O(1) ring
/// that replaces the per-command HashMap churn.
pub(crate) pending: VecDeque<PendingSlot>,
pub(crate) write_pos: usize,
/// Next seq to assign (== `next_emit + pending.len()`).
pub(crate) next_seq: u64,
/// Seq of `pending.front()` — the next reply to emit.
pub(crate) next_emit: u64,
/// Which RESP version this connection speaks. Negotiated via
/// `HELLO`: a fresh conn defaults to RESP2 (Redis 6.x and earlier);
/// the conn switches to RESP3 when the client sends `HELLO 3`, at
/// which point `dispatch_into_resp3` becomes the per-command reply
/// encoder.
pub(crate) proto: RespVersion,
/// QUIT / EOF / protocol error seen — close once drained & flushed.
pub(crate) closing: bool,
pub(crate) want_write: bool,
/// Set while this conn is parked in a `BLPOP` / `BRPOP` /
/// `XREAD BLOCK` waiter. The dispatcher refuses new command processing
/// on this conn until a wake (write to a watched key) or a tick-driven
/// timeout clears the flag.
pub(crate) blocked: bool,
/// Accepted on this shard's per-shard cluster listener (vs the shared
/// SO_REUSEPORT compat port). Cluster conns get `-MOVED` for
/// wrong-shard single-key commands instead of transparent forwarding.
pub(crate) cluster: bool,
/// **H1.C (v1.25)**: dedup flag for `Shard::dirty`. PUBLISH fan-out at
/// N subscribers × M pipelined publishes used to push `N×M` ids onto
/// the dirty list, then `flush_dirty` paid `N×M` `HashMap::get_mut`
/// probes to no-op the redundant entries (output drained on the
/// first flush_conn for each conn). Mirrors redis's
/// `CLIENT_PENDING_WRITE` flag (`networking.c:284-302`):
/// `deliver_publish` only pushes a conn id if `!pending_write`, then
/// sets the flag; `flush_conn` clears it once output is drained.
/// Effect: dirty list shrinks from `N×M` to at most `N` (one entry
/// per actually-touched conn per drain).
pub(crate) pending_write: bool,
// ── COLD (PUBLISH / MULTI / WATCH only; never touched on the
// GET/SET steady state) ──
/// Channels this connection is subscribed to (pub/sub).
pub(crate) sub: HashSet<Vec<u8>>,
/// Glob patterns this connection has `PSUBSCRIBE`-d. Disjoint from
/// `sub` — a PUBLISH that matches both yields one `message` and one
/// `pmessage` frame (Redis semantics).
pub(crate) psub: HashSet<Vec<u8>>,
/// Queued commands inside a MULTI…EXEC transaction (`None` = not in
/// MULTI).
pub(crate) multi: Option<Vec<Argv>>,
/// `WATCH`-ed keys + the version each had on its owning shard at
/// `WATCH` time. `EXEC` fans these out via `Op::CheckWatch`; if any
/// shard reports a mismatch, the transaction aborts (nil multi-bulk).
pub(crate) watched: Vec<(Vec<u8>, u64)>,
/// **v2.0.16** — `CLIENT SETNAME` persisted value. Set by
/// `handle_command`'s CLIENT-intercept arm; read by the same
/// arm on `CLIENT GETNAME`. Empty by default (matches Redis).
/// Lives in the cold section since CLIENT SETNAME is a
/// once-per-connection housekeeping op. Closes v1.52.x finding.
pub(crate) client_name: Vec<u8>,
}
impl Conn {
pub(crate) fn new(sock: Socket) -> Self {
Conn {
sock,
input: Vec::new(),
output: Vec::new(),
output_arcs: Vec::new(),
write_pos: 0,
want_write: false,
next_seq: 0,
next_emit: 0,
closing: false,
pending: VecDeque::new(),
sub: HashSet::new(),
psub: HashSet::new(),
multi: None,
watched: Vec::new(),
client_name: Vec::new(),
proto: RespVersion::default(),
blocked: false,
cluster: false,
pending_write: false,
}
}
}