1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
//! The public entry point: configure and run the thread-per-core server.
use crate::Commands;
use crate::message::{Inbound, PubSubPatternReg, PubSubReg};
use crate::shard::Shard;
use kevy_map::KevyMap;
use kevy_persist::{Aof, Fsync};
use kevy_ring::{Consumer, Producer};
use kevy_store::Store;
use kevy_sys::{Poller, Waker, tcp_listen_reuseport, waker};
use std::collections::{HashMap, VecDeque};
use std::io;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
/// Default slots in each per-core-pair SPSC ring. A full ring spills
/// to a local backlog (see [`Shard`]), so this only bounds the
/// lock-free fast path, not capacity. Overridable via the
/// `[advanced] ring_capacity` config field threaded through
/// [`Runtime::with_advanced`].
const DEFAULT_RING_CAPACITY: usize = 1024;
/// The public entry point: configure and run the thread-per-core server.
pub struct Runtime<C: Commands> {
ip: [u8; 4],
port: u16,
nshards: usize,
commands: C,
/// Directory for per-shard snapshot files (`dump-<id>.rdb`) and AOF logs.
data_dir: PathBuf,
/// Whether the append-only log is enabled.
enable_aof: bool,
/// fsync policy for the AOF. Default `EverySec` matches Redis.
appendfsync: Fsync,
/// auto-trigger BGREWRITEAOF when AOF grew this many % above the size
/// at the previous rewrite. `0` disables. Default `100` (matches Redis).
auto_aof_rewrite_pct: u32,
/// Floor below which auto-rewrite is skipped. Default `64 MiB`.
auto_aof_rewrite_min_size: u64,
/// Reactor SPSC ring slot count. See [`DEFAULT_RING_CAPACITY`].
ring_capacity: usize,
/// Reactor busy-poll iter limit before parking. Stored as `u32`
/// for the per-shard counter; the [`Shard`] field carries it
/// forward into the loop.
spin_limit: u32,
/// Reactor blocking-wait timeout in ms when parked.
park_timeout_ms: u32,
/// Wall-clock-read throttle for the tick check (TTL reaper / live
/// config refresh / auto-AOF-rewrite).
tick_check_every: u32,
/// `[slowlog].slower_than_micros`. Defaults match Redis (10 ms).
slowlog_slower_than_micros: i64,
/// `[slowlog].max_len`. Per-shard cap.
slowlog_max_len: u32,
}
impl<C: Commands> Runtime<C> {
pub fn new(ip: [u8; 4], port: u16, nshards: usize, commands: C) -> Self {
Runtime {
ip,
port,
nshards: nshards.max(1),
commands,
data_dir: PathBuf::from("."),
enable_aof: true,
appendfsync: Fsync::EverySec,
auto_aof_rewrite_pct: 100,
auto_aof_rewrite_min_size: 64 * 1024 * 1024,
ring_capacity: DEFAULT_RING_CAPACITY,
spin_limit: 256,
park_timeout_ms: 50,
tick_check_every: 256,
slowlog_slower_than_micros: 10_000,
slowlog_max_len: 128,
}
}
/// SLOWLOG tuning (`[slowlog]` config section). Defaults: record any
/// command slower than 10 ms (`10_000` µs), keep the most-recent 128
/// entries per shard. Pass `slower_than_micros = -1` to disable
/// (zero hot-path cost — no `Instant::now()` taken).
pub fn with_slowlog(mut self, slower_than_micros: i64, max_len: u32) -> Self {
self.slowlog_slower_than_micros = slower_than_micros;
self.slowlog_max_len = max_len;
self
}
/// Reactor tuning knobs (`[advanced]` config section). Defaults
/// match the pre-v1.4 hardcoded constants. `ring_capacity` is
/// applied at SPSC ring construction (startup only); the other
/// three are read at each iteration of the reactor loop, so
/// values applied here take effect from the next shard.run() call.
pub fn with_advanced(
mut self,
spin_limit: u32,
park_timeout_ms: u32,
tick_check_every: u32,
ring_capacity: usize,
) -> Self {
self.spin_limit = spin_limit;
self.park_timeout_ms = park_timeout_ms;
self.tick_check_every = tick_check_every;
self.ring_capacity = ring_capacity;
self
}
/// Set the directory where shards snapshot to / load from. Default: `.`.
pub fn with_data_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.data_dir = dir.into();
self
}
/// Enable/disable the append-only log. Default: enabled.
pub fn with_aof(mut self, on: bool) -> Self {
self.enable_aof = on;
self
}
/// fsync policy for the AOF. Default `EverySec` matches Redis (lose at
/// most ~1 s of writes on a crash). `Always` is zero-loss but ~50 %
/// throughput; `No` defers everything to the OS pagecache.
pub fn with_appendfsync(mut self, fsync: Fsync) -> Self {
self.appendfsync = fsync;
self
}
/// Auto-trigger BGREWRITEAOF when the live AOF has grown by at least
/// `pct` percent above its size at the previous rewrite, AND is at
/// least `min_size` bytes. `pct=0` disables auto-rewrite (clients can
/// still run BGREWRITEAOF manually). Defaults: 100 % / 64 MiB.
pub fn with_auto_aof_rewrite(mut self, pct: u32, min_size: u64) -> Self {
self.auto_aof_rewrite_pct = pct;
self.auto_aof_rewrite_min_size = min_size;
self
}
/// Spawn one thread per shard and run until `stop` is set.
pub fn run(self, stop: Arc<AtomicBool>) -> io::Result<()> {
let n = self.nshards;
// One lock-free SPSC ring per ordered core-pair (i→j): the producer goes
// to shard i's outbox[j], the consumer to shard j's inbox[i]. There is no
// self-ring — a shard runs its own commands inline, never over a ring.
let mut outboxes: Vec<Vec<Option<Producer<Inbound>>>> =
(0..n).map(|_| (0..n).map(|_| None).collect()).collect();
let mut inboxes: Vec<Vec<Option<Consumer<Inbound>>>> =
(0..n).map(|_| (0..n).map(|_| None).collect()).collect();
for i in 0..n {
for j in 0..n {
if i == j {
continue;
}
let (p, c) = kevy_ring::ring::<Inbound>(self.ring_capacity);
outboxes[i][j] = Some(p);
inboxes[j][i] = Some(c);
}
}
let mut wakers: Vec<Arc<Waker>> = Vec::with_capacity(n);
for _ in 0..n {
wakers.push(Arc::new(waker()?));
}
let parked: Vec<Arc<AtomicBool>> =
(0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
// Shared pub/sub channel registry (one per server, read on every PUBLISH).
let pubsub: PubSubReg = Arc::new(RwLock::new(HashMap::new()));
// Shared pub/sub pattern registry. Empty in steady state — the
// channel-only PUBLISH path skips the walk when so.
let pubsub_patterns: PubSubPatternReg = Arc::new(RwLock::new(Vec::new()));
// Build every shard up front so a bind/open failure aborts before we spawn.
let mut shards = Vec::with_capacity(n);
for id in 0..n {
let listener = tcp_listen_reuseport(self.ip, self.port, 1024)?;
let aof = if self.enable_aof {
Some(Aof::open(
&self.data_dir.join(format!("aof-{id}.aof")),
self.appendfsync,
)?)
} else {
None
};
let mut store = Store::new();
self.commands.on_shard_init(&mut store);
shards.push(Shard {
id,
nshards: n,
store,
commands: self.commands.clone(),
poller: Poller::new()?,
listener,
waker: wakers[id].clone(),
inboxes: std::mem::take(&mut inboxes[id]),
outboxes: std::mem::take(&mut outboxes[id]),
backlog: (0..n).map(|_| VecDeque::new()).collect(),
wakers: wakers.clone(),
conns: KevyMap::new(),
fd_to_conn: KevyMap::new(),
next_conn_id: 1,
events: Vec::with_capacity(1024),
read_buf: vec![0u8; 64 * 1024],
pending_wakes: vec![false; n],
parked: parked.clone(),
data_dir: self.data_dir.clone(),
aof,
auto_aof_rewrite_pct: self.auto_aof_rewrite_pct,
auto_aof_rewrite_min_size: self.auto_aof_rewrite_min_size,
dirty: Vec::new(),
pubsub: pubsub.clone(),
pubsub_patterns: pubsub_patterns.clone(),
psub_local: HashMap::new(),
publish_batch: (0..n).map(|_| Vec::new()).collect(),
request_batch: (0..n).map(|_| Vec::new()).collect(),
notify_flags: crate::NotificationFlags::default(),
spin_limit: self.spin_limit,
// `Poller::wait` takes the timeout as `i32` (POSIX
// poll/epoll convention). The config knob is `u32` —
// we clamp to i32::MAX, far above any sane park-timeout.
park_timeout_ms: self.park_timeout_ms.min(i32::MAX as u32) as i32,
tick_check_every: self.tick_check_every,
slowlog: crate::exec_slowlog::SlowlogState::new(
self.slowlog_slower_than_micros,
self.slowlog_max_len,
),
blocked: crate::blocked::BlockedClients::new(),
origin_blocks: std::collections::HashMap::new(),
xwaiters: crate::block_xshard::XShardWaiters::default(),
});
}
// Reactor selection on Linux:
// KEVY_IO_URING unset → auto: try io_uring, fall back to epoll if the
// host can't build the ring (probe below) — startup never fails.
// KEVY_IO_URING=0/off/no/false → force the epoll readiness reactor.
// KEVY_IO_URING=<anything else> → force io_uring (no fallback; a
// setup failure then surfaces loudly — for benchmarks / tests).
// The probe creates+drops a real ring with the run_uring parameters, so
// it catches a seccomp-blocked io_uring_setup (Docker's default profile)
// and pre-5.19 kernels before any shard loads data. (macOS = kqueue.)
#[cfg(target_os = "linux")]
let use_uring = match std::env::var("KEVY_IO_URING").ok().as_deref() {
Some("0") | Some("off") | Some("no") | Some("false") => false,
Some(_) => true,
None => {
let avail = crate::uring_reactor::io_uring_available();
eprintln!(
"kevy: reactor = {} (io_uring {})",
if avail { "io_uring" } else { "epoll" },
if avail { "available" } else { "unavailable — kernel <5.19 or seccomp; using epoll" },
);
avail
}
};
let mut handles = Vec::with_capacity(n);
for shard in shards {
let stop = stop.clone();
let id = shard.id;
handles.push(std::thread::spawn(move || {
#[cfg(target_os = "linux")]
let res = if use_uring { shard.run_uring(stop) } else { shard.run(stop) };
#[cfg(not(target_os = "linux"))]
let res = shard.run(stop);
if let Err(e) = res {
eprintln!("kevy: shard {id} exited with error: {e}");
}
}));
}
for h in handles {
let _ = h.join();
}
Ok(())
}
}