1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
//! The public entry point: configure and run the thread-per-core server.
use crate::Commands;
use crate::message::{Inbound, PubSubPatternReg, PubSubReg};
use crate::shard::Shard;
use kevy_map::KevyMap;
use kevy_persist::{Aof, Fsync};
use kevy_ring::{Consumer, Producer};
use kevy_store::Store;
use kevy_sys::{Poller, Waker, tcp_listen_reuseport, waker};
use std::collections::{HashMap, VecDeque};
use std::io;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, RwLock};
/// Slots in each per-core-pair SPSC ring. A full ring spills to a local backlog
/// (see [`Shard`]), so this only bounds the lock-free fast path, not capacity.
const RING_CAPACITY: usize = 1024;
/// The public entry point: configure and run the thread-per-core server.
pub struct Runtime<C: Commands> {
ip: [u8; 4],
port: u16,
nshards: usize,
commands: C,
/// Directory for per-shard snapshot files (`dump-<id>.rdb`) and AOF logs.
data_dir: PathBuf,
/// Whether the append-only log is enabled.
enable_aof: bool,
/// fsync policy for the AOF. Default `EverySec` matches Redis.
appendfsync: Fsync,
/// auto-trigger BGREWRITEAOF when AOF grew this many % above the size
/// at the previous rewrite. `0` disables. Default `100` (matches Redis).
auto_aof_rewrite_pct: u32,
/// Floor below which auto-rewrite is skipped. Default `64 MiB`.
auto_aof_rewrite_min_size: u64,
}
impl<C: Commands> Runtime<C> {
pub fn new(ip: [u8; 4], port: u16, nshards: usize, commands: C) -> Self {
Runtime {
ip,
port,
nshards: nshards.max(1),
commands,
data_dir: PathBuf::from("."),
enable_aof: true,
appendfsync: Fsync::EverySec,
auto_aof_rewrite_pct: 100,
auto_aof_rewrite_min_size: 64 * 1024 * 1024,
}
}
/// Set the directory where shards snapshot to / load from. Default: `.`.
pub fn with_data_dir(mut self, dir: impl Into<PathBuf>) -> Self {
self.data_dir = dir.into();
self
}
/// Enable/disable the append-only log. Default: enabled.
pub fn with_aof(mut self, on: bool) -> Self {
self.enable_aof = on;
self
}
/// fsync policy for the AOF. Default `EverySec` matches Redis (lose at
/// most ~1 s of writes on a crash). `Always` is zero-loss but ~50 %
/// throughput; `No` defers everything to the OS pagecache.
pub fn with_appendfsync(mut self, fsync: Fsync) -> Self {
self.appendfsync = fsync;
self
}
/// Auto-trigger BGREWRITEAOF when the live AOF has grown by at least
/// `pct` percent above its size at the previous rewrite, AND is at
/// least `min_size` bytes. `pct=0` disables auto-rewrite (clients can
/// still run BGREWRITEAOF manually). Defaults: 100 % / 64 MiB.
pub fn with_auto_aof_rewrite(mut self, pct: u32, min_size: u64) -> Self {
self.auto_aof_rewrite_pct = pct;
self.auto_aof_rewrite_min_size = min_size;
self
}
/// Spawn one thread per shard and run until `stop` is set.
pub fn run(self, stop: Arc<AtomicBool>) -> io::Result<()> {
let n = self.nshards;
// One lock-free SPSC ring per ordered core-pair (i→j): the producer goes
// to shard i's outbox[j], the consumer to shard j's inbox[i]. There is no
// self-ring — a shard runs its own commands inline, never over a ring.
let mut outboxes: Vec<Vec<Option<Producer<Inbound>>>> =
(0..n).map(|_| (0..n).map(|_| None).collect()).collect();
let mut inboxes: Vec<Vec<Option<Consumer<Inbound>>>> =
(0..n).map(|_| (0..n).map(|_| None).collect()).collect();
for i in 0..n {
for j in 0..n {
if i == j {
continue;
}
let (p, c) = kevy_ring::ring::<Inbound>(RING_CAPACITY);
outboxes[i][j] = Some(p);
inboxes[j][i] = Some(c);
}
}
let mut wakers: Vec<Arc<Waker>> = Vec::with_capacity(n);
for _ in 0..n {
wakers.push(Arc::new(waker()?));
}
let parked: Vec<Arc<AtomicBool>> =
(0..n).map(|_| Arc::new(AtomicBool::new(false))).collect();
// Shared pub/sub channel registry (one per server, read on every PUBLISH).
let pubsub: PubSubReg = Arc::new(RwLock::new(HashMap::new()));
// Shared pub/sub pattern registry. Empty in steady state — the
// channel-only PUBLISH path skips the walk when so.
let pubsub_patterns: PubSubPatternReg = Arc::new(RwLock::new(Vec::new()));
// Build every shard up front so a bind/open failure aborts before we spawn.
let mut shards = Vec::with_capacity(n);
for id in 0..n {
let listener = tcp_listen_reuseport(self.ip, self.port, 1024)?;
let aof = if self.enable_aof {
Some(Aof::open(
&self.data_dir.join(format!("aof-{id}.aof")),
self.appendfsync,
)?)
} else {
None
};
let mut store = Store::new();
self.commands.on_shard_init(&mut store);
shards.push(Shard {
id,
nshards: n,
store,
commands: self.commands.clone(),
poller: Poller::new()?,
listener,
waker: wakers[id].clone(),
inboxes: std::mem::take(&mut inboxes[id]),
outboxes: std::mem::take(&mut outboxes[id]),
backlog: (0..n).map(|_| VecDeque::new()).collect(),
wakers: wakers.clone(),
conns: KevyMap::new(),
fd_to_conn: KevyMap::new(),
next_conn_id: 1,
events: Vec::with_capacity(1024),
read_buf: vec![0u8; 64 * 1024],
pending_wakes: vec![false; n],
parked: parked.clone(),
data_dir: self.data_dir.clone(),
aof,
auto_aof_rewrite_pct: self.auto_aof_rewrite_pct,
auto_aof_rewrite_min_size: self.auto_aof_rewrite_min_size,
dirty: Vec::new(),
pubsub: pubsub.clone(),
pubsub_patterns: pubsub_patterns.clone(),
psub_local: HashMap::new(),
publish_batch: (0..n).map(|_| Vec::new()).collect(),
request_batch: (0..n).map(|_| Vec::new()).collect(),
});
}
// Opt into the Linux io_uring (completion) reactor with KEVY_IO_URING=1;
// otherwise use the readiness reactor (epoll/kqueue), the default + macOS.
#[cfg(target_os = "linux")]
let use_uring = std::env::var_os("KEVY_IO_URING").is_some();
let mut handles = Vec::with_capacity(n);
for shard in shards {
let stop = stop.clone();
let id = shard.id;
handles.push(std::thread::spawn(move || {
#[cfg(target_os = "linux")]
let res = if use_uring { shard.run_uring(stop) } else { shard.run(stop) };
#[cfg(not(target_os = "linux"))]
let res = shard.run(stop);
if let Err(e) = res {
eprintln!("kevy: shard {id} exited with error: {e}");
}
}));
}
for h in handles {
let _ = h.join();
}
Ok(())
}
}