Skip to main content

kevy_rt/
runtime.rs

1//! The public entry point: configure and run the thread-per-core server.
2
3use crate::Commands;
4use crate::message::{Inbound, PubSubPatternReg, PubSubReg};
5use crate::shard::Shard;
6use kevy_map::KevyMap;
7use kevy_persist::{Aof, Fsync};
8use kevy_ring::{Consumer, Producer};
9use kevy_store::Store;
10use kevy_sys::{Poller, Waker, tcp_listen_reuseport, waker};
11use std::collections::{HashMap, VecDeque};
12use std::io;
13use std::path::PathBuf;
14use std::sync::atomic::{AtomicBool, AtomicU64};
15use std::sync::{Arc, RwLock};
16
17/// Default slots in each per-core-pair SPSC ring. A full ring spills
18/// to a local backlog (see [`Shard`]), so this only bounds the
19/// lock-free fast path, not capacity. Overridable via the
20/// `[advanced] ring_capacity` config field threaded through
21/// [`Runtime::with_advanced`].
22const DEFAULT_RING_CAPACITY: usize = 1024;
23
24/// The public entry point: configure and run the thread-per-core server.
25pub struct Runtime<C: Commands> {
26    pub(crate) ip: [u8; 4],
27    pub(crate) port: u16,
28    pub(crate) nshards: usize,
29    pub(crate) commands: C,
30    /// Directory for per-shard snapshot files (`dump-<id>.rdb`) and AOF logs.
31    pub(crate) data_dir: PathBuf,
32    /// Whether the append-only log is enabled.
33    pub(crate) enable_aof: bool,
34    /// fsync policy for the AOF. Default `EverySec` matches Redis.
35    pub(crate) appendfsync: Fsync,
36    /// auto-trigger BGREWRITEAOF when AOF grew this many % above the size
37    /// at the previous rewrite. `0` disables. Default `100` (matches Redis).
38    pub(crate) auto_aof_rewrite_pct: u32,
39    /// Floor below which auto-rewrite is skipped. Default `64 MiB`.
40    pub(crate) auto_aof_rewrite_min_size: u64,
41    /// Reactor SPSC ring slot count. See [`DEFAULT_RING_CAPACITY`].
42    pub(crate) ring_capacity: usize,
43    /// Reactor busy-poll iter limit before parking. Stored as `u32`
44    /// for the per-shard counter; the [`Shard`] field carries it
45    /// forward into the loop.
46    pub(crate) spin_limit: u32,
47    /// **v1.30** — `Some(N)` = only shards `0..N` arm accept SQE. `None`
48    /// = every shard accepts (v1.29 byte-identical).
49    pub(crate) accept_shards: Option<usize>,
50    /// **v1.37** — total cap on active client conns. `0` = unlimited.
51    pub(crate) max_clients: usize,
52    /// Reactor blocking-wait timeout in ms when parked.
53    pub(crate) park_timeout_ms: u32,
54    /// Wall-clock-read throttle for the tick check (TTL reaper / live
55    /// config refresh / auto-AOF-rewrite).
56    pub(crate) tick_check_every: u32,
57    /// `[slowlog].slower_than_micros`. Default: `-1` (OFF — zero
58    /// hot-path cost: every command would otherwise pay an
59    /// `Instant::now()` pair around dispatch). Set to `10_000` to match
60    /// Redis's default 10 ms threshold; see [`Self::with_slowlog`] /
61    /// `CONFIG SET slowlog-log-slower-than 10000`.
62    pub(crate) slowlog_slower_than_micros: i64,
63    /// `[slowlog].max_len`. Per-shard cap.
64    pub(crate) slowlog_max_len: u32,
65    /// Single-node cluster mode: slot-based key routing (CRC16 `{hashtag}`
66    /// → contiguous ranges) + one deterministic extra listener per shard at
67    /// `cluster_port_base + id`. `None` = off (default, zero change).
68    pub(crate) cluster_port_base: Option<u16>,
69    /// v3-cluster replication: when `true`, each shard runs a
70    /// `ReplicationSource` with `replication_buffer_size` byte budget;
71    /// every applied mutation is pushed to the backlog. The TCP
72    /// listener + streaming loop arrive in subsequent tasks (T1.12+);
73    /// this batch only wires the producer side. Default `false`.
74    pub(crate) enable_replication: bool,
75    /// Per-shard backlog byte budget when `enable_replication` is set.
76    /// Fed from `[replication] replication_buffer_size`. Default
77    /// `256 MiB` (matches the kevy-config default).
78    pub(crate) replication_buffer_size: u64,
79    /// v3-cluster replication listener: shard `i` binds at
80    /// `replication_port_base + i` (mirrors cluster listener pattern;
81    /// per Issue Ledger I2). `None` = no listener (producer side runs
82    /// without a network surface, backlog accumulates and evicts —
83    /// useful for benchmarks). Default `None`.
84    pub(crate) replication_port_base: Option<u16>,
85    /// Per-shard SlotTable reconnect-window in ms (T1.15). After a
86    /// streaming replica disconnects, its `(replica_id, sent_offset)`
87    /// is recorded in the shard's `slots` map; slots past this age
88    /// are reaped on the next shard tick. Default `60_000` (60 s)
89    /// matches the kevy-config default.
90    pub(crate) replication_reconnect_window_ms: u32,
91    /// Per-shard replica inboxes installed by
92    /// [`Self::with_replica_inboxes`]. Each entry is consumed
93    /// (via `Option::take`) when its shard is constructed, so the
94    /// receiver flows from this Vec to the matching `Shard.replica_inbox`.
95    /// Empty when no replica mode is configured.
96    pub(crate) replica_inboxes: Vec<Option<crate::replica_inbox::ReplicaInboxReceiver>>,
97    /// v1.25 UDS: when `Some(path)`, ALSO bind a Unix-domain stream
98    /// listener at `path` on shard 0 (single global socket, like valkey's
99    /// `unixsocket` config). Lets benches/local clients skip TCP loopback
100    /// overhead. TCP listener stays bound regardless.
101    #[allow(dead_code)] // consumed during run() via take-into-Shard
102    pub(crate) unix_socket_path: Option<PathBuf>,
103}
104
105impl<C: Commands> Runtime<C> {
106    #[must_use]
107    pub fn new(ip: [u8; 4], port: u16, nshards: usize, commands: C) -> Self {
108        Runtime {
109            ip,
110            port,
111            nshards: nshards.max(1),
112            commands,
113            data_dir: PathBuf::from("."),
114            enable_aof: true,
115            appendfsync: Fsync::EverySec,
116            auto_aof_rewrite_pct: 100,
117            auto_aof_rewrite_min_size: 64 * 1024 * 1024,
118            ring_capacity: DEFAULT_RING_CAPACITY,
119            spin_limit: 256,
120            accept_shards: None,
121            max_clients: 10_000,
122            park_timeout_ms: 50,
123            tick_check_every: 256,
124            slowlog_slower_than_micros: -1,
125            slowlog_max_len: 128,
126            cluster_port_base: None,
127            enable_replication: false,
128            replica_inboxes: Vec::new(),
129            replication_buffer_size: 256 * 1024 * 1024,
130            replication_port_base: None,
131            replication_reconnect_window_ms: 60_000,
132            unix_socket_path: None,
133        }
134    }
135
136
137    /// Spawn one thread per shard and run until `stop` is set.
138    /// v1.25 UDS: also bind a Unix-domain stream listener at `path`. Lets
139    /// local clients (and benchmarks) skip the TCP loopback round-trip.
140    /// Bound on shard 0 only (no SO_REUSEPORT for AF_UNIX, single global
141    /// socket like valkey's `unixsocket` config). TCP listener stays
142    /// bound at the configured `port` regardless.
143    #[must_use]
144    pub fn with_unix_socket(mut self, path: PathBuf) -> Self {
145        self.unix_socket_path = Some(path);
146        self
147    }
148
149    pub fn run(mut self, stop: Arc<AtomicBool>) -> io::Result<()> {
150        let n = self.nshards;
151
152        // v1.25 A.3 (B2: single global bio thread, per
153        // `bench/V125-DECISIONS-PENDING.md`). Spawn BEFORE shards so
154        // every shard's first overwrite already has a live consumer.
155        // The held `bio_send` is moved into the shard loop below
156        // (`store.set_bio_drop_sender`); shutdown ordering is:
157        //   1. shards return → their `Store`s drop → their cloned
158        //      Sender halves drop
159        //   2. this fn's local `bio_send` is dropped here at end of
160        //      scope → channel closes → bio thread's `recv()` returns
161        //      Err → bio thread exits
162        //   3. `bio_handle.join()` blocks until that exit so a final
163        //      large free isn't truncated by process tear-down
164        // (`madvise` returning the page to the kernel still needs the
165        // process alive). See `crate::bio` for the full rationale.
166        let (bio_send, bio_handle) = crate::bio::spawn();
167
168        // Cluster binds shard `i` at `port_base + i`; reject a range that
169        // overflows u16 up front (loud) instead of wrapping a listener onto
170        // a low/privileged port while CLUSTER SLOTS advertises 65536+.
171        if let Some(base) = self.cluster_port_base
172            && base as usize + n > u16::MAX as usize + 1
173        {
174            return Err(io::Error::new(
175                io::ErrorKind::InvalidInput,
176                format!(
177                    "cluster port range {base}..={} exceeds 65535 ({n} shards)",
178                    base as usize + n - 1
179                ),
180            ));
181        }
182
183        // Same overflow check for the replication port range
184        // (`base + 0 .. base + n`). See Issue Ledger I2 for the
185        // per-shard listener decision.
186        if let Some(base) = self.replication_port_base
187            && base as usize + n > u16::MAX as usize + 1
188        {
189            return Err(io::Error::new(
190                io::ErrorKind::InvalidInput,
191                format!(
192                    "replication port range {base}..={} exceeds 65535 ({n} shards)",
193                    base as usize + n - 1
194                ),
195            ));
196        }
197
198        // One lock-free SPSC ring per ordered core-pair (i→j): the producer goes
199        // to shard i's outbox[j], the consumer to shard j's inbox[i]. There is no
200        // self-ring — a shard runs its own commands inline, never over a ring.
201        let mut outboxes: Vec<Vec<Option<Producer<Inbound>>>> =
202            (0..n).map(|_| (0..n).map(|_| None).collect()).collect();
203        let mut inboxes: Vec<Vec<Option<Consumer<Inbound>>>> =
204            (0..n).map(|_| (0..n).map(|_| None).collect()).collect();
205        for i in 0..n {
206            for j in 0..n {
207                if i == j {
208                    continue;
209                }
210                let (p, c) = kevy_ring::ring::<Inbound>(self.ring_capacity);
211                outboxes[i][j] = Some(p);
212                inboxes[j][i] = Some(c);
213            }
214        }
215
216        let mut wakers: Vec<Arc<Waker>> = Vec::with_capacity(n);
217        for _ in 0..n {
218            wakers.push(Arc::new(waker()?));
219        }
220        let parked: Vec<Arc<crate::shard::CachePadded<AtomicBool>>> = (0..n)
221            .map(|_| Arc::new(crate::shard::CachePadded::new(AtomicBool::new(false))))
222            .collect();
223        // Per-shard inbox-dirty bitmaps (one u64 bit per peer src).
224        // Senders OR a bit on the target's dirty word; the target's
225        // `drain_inbound_core` swaps and short-circuits when 0.
226        assert!(
227            n <= 64,
228            "kevy-rt: shard count {n} exceeds 64 — inbound_dirty bitmap holds one bit per peer in a u64. Reduce --threads or extend to a multi-word bitmap.",
229        );
230        // A2 (2026-06-20): pad each Arc<AtomicU64> to a full 64-byte cache
231        // line. H1 c2c diagnostic showed cross-shard fetch_or vs. owner
232        // swap on adjacent atomics bounced cache lines between cores.
233        let inbound_dirty: Vec<Arc<crate::shard::CachePadded<AtomicU64>>> = (0..n)
234            .map(|_| Arc::new(crate::shard::CachePadded::new(AtomicU64::new(0))))
235            .collect();
236
237        // Shared pub/sub channel registry (one per server, read on every PUBLISH).
238        let pubsub: PubSubReg = Arc::new(RwLock::new(HashMap::new()));
239        // Shared pub/sub pattern registry. Empty in steady state — the
240        // channel-only PUBLISH path skips the walk when so.
241        let pubsub_patterns: PubSubPatternReg = Arc::new(RwLock::new(Vec::new()));
242
243        // Reconcile the on-disk shard layout (count + routing) before any
244        // shard loads its files; a mismatch re-homes every key once, here.
245        // Skipped for a pure in-memory run against a dir with no kevy files.
246        // Cluster mode always records the layout even with AOF off and an
247        // empty dir: a later SAVE writes slot-distributed `dump-{i}.rdb`, and
248        // without a meta a non-cluster restart would read them as KevyHash
249        // and silently strand every key.
250        if self.enable_aof
251            || self.cluster_port_base.is_some()
252            || crate::reshard::has_kevy_files(&self.data_dir)
253        {
254            let routing = if self.cluster_port_base.is_some() {
255                kevy_persist::Routing::Slots
256            } else {
257                kevy_persist::Routing::KevyHash
258            };
259            crate::reshard::ensure_layout(&self.data_dir, n, routing, &self.commands)?;
260        }
261
262        // Advertised cluster topology (None = cluster off). A 0.0.0.0 bind
263        // advertises 127.0.0.1 — an unroutable redirect target would strand
264        // every cluster client (single-machine scope; no announce-ip knob).
265        let topo = self.cluster_port_base.map(|base| crate::cluster::ClusterTopo {
266            ip: if self.ip == [0, 0, 0, 0] { [127, 0, 0, 1] } else { self.ip },
267            port_base: base,
268        });
269
270        // Build every shard up front so a bind/open failure aborts before we spawn.
271        let mut shards = Vec::with_capacity(n);
272        // UDS listener: only ONE per server (no SO_REUSEPORT for AF_UNIX), so
273        // it lives on shard 0. Bound up-front so a bind failure aborts before
274        // any shard spawns.
275        let mut unix_listener: Option<kevy_sys::Socket> = None;
276        if let Some(p) = self.unix_socket_path.as_ref() {
277            let path_bytes = p.to_string_lossy();
278            unix_listener = Some(kevy_sys::unix_listen(path_bytes.as_bytes(), 1024)?);
279        }
280        for id in 0..n {
281            let arms_accept = self.accept_shards.map_or(true, |k| id < k);
282            // v1.30 — off-accept-set shards skip the SO_REUSEPORT bind so
283            // the kernel routes new conns only to the armed subset.
284            let listener = if arms_accept {
285                Some(tcp_listen_reuseport(self.ip, self.port, 1024)?)
286            } else {
287                None
288            };
289            // Cluster mode: a second, deterministic per-shard listener at
290            // port_base + id (plain bind — exactly one owner per port).
291            let cluster_listener = match self.cluster_port_base {
292                Some(base) => Some(kevy_sys::tcp_listen(self.ip, base + id as u16, 1024)?),
293                None => None,
294            };
295            // Replication listener (per Issue Ledger I2): per-shard
296            // deterministic port, same `tcp_listen` (no SO_REUSEPORT)
297            // pattern as cluster. A replica's shard-aware client will
298            // connect to every `base + id` to mirror the full keyspace.
299            let replication_listener = match self.replication_port_base {
300                Some(base) => Some(kevy_sys::tcp_listen(self.ip, base + id as u16, 1024)?),
301                None => None,
302            };
303            let aof = if self.enable_aof {
304                Some(Aof::open(
305                    &kevy_persist::layout::aof_path(&self.data_dir, id),
306                    self.appendfsync,
307                )?)
308            } else {
309                None
310            };
311            let mut store = Store::new();
312            // The reactor loop refreshes the store clock once per batch, so
313            // lazy expiry can trust the cached clock (skip per-command
314            // `Instant::now()`).
315            store.set_cached_clock(true);
316            // v1.25 A.3: hand the bio-drop channel sender to the store so
317            // SET overwrites of heavy values (Arc<[u8]> ≥ 256 B, non-empty
318            // collections) get freed off-reactor. Sender clone is cheap
319            // (`Arc::clone`); the bio thread is shared across all shards
320            // (B2 single-global, mirrors valkey `bio.c`).
321            store.set_bio_drop_sender(bio_send.clone());
322            self.commands.on_shard_init(&mut store);
323            shards.push(Shard {
324                id,
325                nshards: n,
326                cluster: topo.clone(),
327                cluster_listener,
328                // UDS: only shard 0 holds the (single) unix listener.
329                unix_listener: if id == 0 { unix_listener.take() } else { None },
330                store,
331                commands: self.commands.clone(),
332                poller: Poller::new()?,
333                listener,
334                waker: wakers[id].clone(),
335                inboxes: std::mem::take(&mut inboxes[id]),
336                outboxes: std::mem::take(&mut outboxes[id]),
337                backlog: (0..n).map(|_| VecDeque::new()).collect(),
338                wakers: wakers.clone(),
339                conns: KevyMap::new(),
340                arm_pending: Vec::new(),
341                closing_uring_conns: Vec::new(),
342                fd_to_conn: KevyMap::new(),
343                next_conn_id: 1,
344                events: Vec::with_capacity(1024),
345                read_buf: vec![0u8; 64 * 1024],
346                pending_wakes: 0,
347                backlog_nonempty: 0,
348                request_batch_nonempty: 0,
349                publish_batch_nonempty: 0,
350                parked: parked.clone(),
351                inbound_dirty: inbound_dirty.clone(),
352                data_dir: self.data_dir.clone(),
353                aof,
354                replicate: if self.enable_replication {
355                    Some(kevy_replicate::source::ReplicationSource::new(
356                        usize::try_from(self.replication_buffer_size)
357                            .unwrap_or(usize::MAX),
358                    ))
359                } else {
360                    None
361                },
362                replication_listener,
363                replicas: Vec::new(),
364                slots: kevy_replicate::slot::SlotTable::new(),
365                replication_reconnect_window_ms: self.replication_reconnect_window_ms,
366                replication_epoch: std::time::Instant::now(),
367                replica_inbox: self.replica_inboxes.get_mut(id).and_then(Option::take),
368                replica_snapshot_buf: Vec::new(),
369                persist: crate::persist_worker::PersistWorker::new(),
370                auto_aof_rewrite_pct: self.auto_aof_rewrite_pct,
371                auto_aof_rewrite_min_size: self.auto_aof_rewrite_min_size,
372                dirty: Vec::new(),
373                pubsub: pubsub.clone(),
374                pubsub_patterns: pubsub_patterns.clone(),
375                psub_local: HashMap::new(),
376                subs_by_channel: HashMap::new(),
377                publish_batch: (0..n).map(|_| Vec::new()).collect(),
378                request_batch: (0..n).map(|_| Vec::new()).collect(),
379                // Seed from the live config at construction, not default():
380                // these flags were otherwise blind until the first 100 ms
381                // shard tick, so a write landing before that never fired
382                // its keyspace notification (CI-visible flake; a real
383                // startup gap for any pre-configured notify_keyspace_events).
384                notify_flags: self
385                    .commands
386                    .live_runtime_config()
387                    .notify_flags
388                    .unwrap_or_default(),
389                spin_limit: self.spin_limit,
390                arms_accept: self.accept_shards.map_or(true, |n| id < n),
391                max_clients_per_shard: if self.max_clients == 0 {
392                    0
393                } else {
394                    self.max_clients.div_ceil(n)
395                },
396                rejected_connections: 0,
397                // `Poller::wait` takes the timeout as `i32` (POSIX
398                // poll/epoll convention). The config knob is `u32` —
399                // we clamp to i32::MAX, far above any sane park-timeout.
400                park_timeout_ms: self.park_timeout_ms.min(i32::MAX as u32) as i32,
401                tick_check_every: self.tick_check_every,
402                slowlog: crate::exec_slowlog::SlowlogState::new(
403                    self.slowlog_slower_than_micros,
404                    self.slowlog_max_len,
405                ),
406                blocked: crate::blocked::BlockedClients::new(),
407                origin_blocks: std::collections::HashMap::new(),
408                xwaiters: crate::block_xshard::XShardWaiters::default(),
409                reply_scratch: Vec::with_capacity(4096),
410                argv_pool: kevy_resp::ArgvPool::new(),
411            });
412        }
413
414        // Reactor selection on Linux:
415        //   KEVY_IO_URING unset → auto: try io_uring, fall back to epoll if the
416        //     host can't build the ring (probe below) — startup never fails.
417        //   KEVY_IO_URING=0/off/no/false → force the epoll readiness reactor.
418        //   KEVY_IO_URING=<anything else> → force io_uring (no fallback; a
419        //     setup failure then surfaces loudly — for benchmarks / tests).
420        // The probe creates+drops a real ring with the run_uring parameters, so
421        // it catches a seccomp-blocked io_uring_setup (Docker's default profile)
422        // and pre-5.19 kernels before any shard loads data. (macOS = kqueue.)
423        #[cfg(target_os = "linux")]
424        let use_uring = match std::env::var("KEVY_IO_URING").ok().as_deref() {
425            Some("0") | Some("off") | Some("no") | Some("false") => false,
426            Some(_) => true,
427            None => {
428                let avail = crate::uring_reactor::io_uring_available();
429                eprintln!(
430                    "kevy: reactor = {} (io_uring {})",
431                    if avail { "io_uring" } else { "epoll" },
432                    if avail { "available" } else { "unavailable — kernel <5.19 or seccomp; using epoll" },
433                );
434                avail
435            }
436        };
437
438        // v1.18.0: the replication listener + accept path is wired only
439        // through the epoll/kqueue reactor (`shard.run`); the io_uring
440        // T1.12.5: io_uring + replication is now supported. The
441        // replication-adjacent work (accept / read / write / pump /
442        // slot+view+watermark ticks) is poll-driven from the io_uring
443        // reactor's tick path (mostly per-tick @ 10 Hz, with
444        // `pump_replication` + `reap_closed_replicas` per-iter via
445        // their own early returns when nothing's live). Throughput
446        // path stays io_uring-native — only replica metadata uses
447        // polling. See `Shard::run_uring`.
448
449        let mut handles = Vec::with_capacity(n);
450        for shard in shards {
451            let stop = stop.clone();
452            let id = shard.id;
453            handles.push(std::thread::spawn(move || {
454                #[cfg(target_os = "linux")]
455                let res = if use_uring { shard.run_uring(stop) } else { shard.run(stop) };
456                #[cfg(not(target_os = "linux"))]
457                let res = shard.run(stop);
458                if let Err(e) = res {
459                    eprintln!("kevy: shard {id} exited with error: {e}");
460                }
461            }));
462        }
463        for h in handles {
464            let _ = h.join();
465        }
466        // v1.25 A.3 shutdown: every shard has joined → every cloned
467        // sender on every Store has been dropped. Drop the last live
468        // sender (this fn's `bio_send`) so the channel closes; the bio
469        // thread's `recv()` returns Err and it exits its loop. The
470        // `join()` then blocks until that exit completes — guarding
471        // against process tear-down while a final large free is in
472        // flight (an unsafe wrt `madvise`/`munmap` semantics — the
473        // kernel needs the process alive to actually release pages).
474        drop(bio_send);
475        let _ = bio_handle.join();
476        Ok(())
477    }
478}