Skip to main content

kevy_rt/
lib.rs

1//! kevy-rt — shared-nothing, thread-per-core runtime.
2//!
3//! Each core runs its own reactor (kqueue/epoll) and owns one **shard** of the
4//! keyspace (`hash(key) % nshards`). There is no shared mutable state and no
5//! lock on the hot path — cores communicate only by message passing over
6//! channels, woken via a self-pipe ([`kevy_sys::Waker`]). Connections are spread
7//! across cores by `SO_REUSEPORT`; a command whose key lives on another core is
8//! forwarded to that core, executed there, and the reply routed back to the
9//! originating connection.
10//!
11//! Per-connection reply ordering is preserved (RESP is pipelined): each command
12//! gets a monotonic seq; replies are emitted only in contiguous seq order, so an
13//! async cross-core reply never overtakes an earlier one.
14//!
15//! The cross-core channel currently uses `std::sync::mpsc` (pure Rust, zero
16//! deps); swapping in a lock-free SPSC/MPSC ring is a perf-polish item.
17//! Command semantics are injected via the [`Commands`] trait, keeping the
18//! runtime independent of the concrete command set. Part of the [kevy] server.
19//!
20//! [kevy]: https://crates.io/crates/kevy
21//!
22//! # Module map
23//!
24//! - [`Runtime`] (in `runtime`) — public entry point; spawns one `shard` per core.
25//! - `shard` — the per-core reactor: sockets, the inbound queue, reply flushing.
26//! - `exec` — command semantics: routing, execution, and result reduction.
27//! - `message` — internal cross-core work/result types.
28//! - `conn` — per-connection state (input/output, seq ring, subscriptions).
29//! - `reduce` — reply reduction (`materialize`) and pure helpers (set algebra,
30//!   shard hashing, pub/sub framing).
31//!
32//! # Example
33//!
34//! Implement [`Commands`] for your command set and run it. ([`Store`] is
35//! re-exported so you don't need a separate dependency.)
36//!
37//! ```no_run
38//! use kevy_rt::{ArgvView, Commands, Route, Runtime, Store, TxnKind};
39//! use std::sync::Arc;
40//! use std::sync::atomic::AtomicBool;
41//!
42//! #[derive(Clone)]
43//! struct MyCommands;
44//! impl Commands for MyCommands {
45//!     fn route<A: ArgvView + ?Sized>(&self, args: &A) -> Route {
46//!         if args.len() >= 2 { Route::Single(1) } else { Route::Local }
47//!     }
48//!     fn dispatch<A: ArgvView + ?Sized>(&self, _store: &mut Store, _args: &A) -> Vec<u8> {
49//!         b"+OK\r\n".to_vec()
50//!     }
51//!     fn is_quit<A: ArgvView + ?Sized>(&self, args: &A) -> bool {
52//!         args.first().is_some_and(|c| c.eq_ignore_ascii_case(b"QUIT"))
53//!     }
54//!     fn is_write<A: ArgvView + ?Sized>(&self, _args: &A) -> bool { false }
55//!     fn txn_kind<A: ArgvView + ?Sized>(&self, _args: &A) -> TxnKind { TxnKind::Other }
56//! }
57//!
58//! // One shard per core, listening on 127.0.0.1:6379, until `stop` is set.
59//! let rt = Runtime::new([127, 0, 0, 1], 6379, 4, MyCommands);
60//! rt.run(Arc::new(AtomicBool::new(false))).unwrap();
61//! ```
62// Almost entirely safe: the only `unsafe` is in `uring_reactor` (Linux io_uring),
63// which needs raw buffer pointers for zero-allocation completion I/O — on the hot
64// path toward kevy's disk-I/O-ceiling goal, where a buffer-ownership safe wrapper
65// would add per-op cost. Each such block documents its invariant; the
66// epoll/kqueue path and every other module stay safe, and all libc lives in
67// kevy-sys.
68#![deny(unsafe_op_in_unsafe_fn)]
69
70mod bio;
71mod block_xshard;
72mod blocked;
73mod lua_wake_bridge;
74mod cache_padded;
75mod cluster;
76mod conn;
77mod exec;
78mod exec_build;
79mod exec_crossslot;
80mod exec_dispatch;
81mod exec_notify;
82mod exec_op;
83mod exec_pubsub;
84mod exec_pubsub_pattern;
85mod exec_rename;
86mod exec_slowlog;
87mod exec_watch;
88mod inbox;
89mod persist_worker;
90mod message;
91mod reduce;
92mod replica_inbox;
93mod replication;
94mod replication_apply;
95mod replication_gate;
96mod replication_io;
97mod replication_pump;
98mod reshard;
99mod route;
100mod runtime;
101mod runtime_builders;
102mod shard;
103mod shard_flush;
104mod shard_lifecycle;
105mod shard_tick;
106#[cfg(target_os = "linux")]
107mod uring_arm;
108#[cfg(target_os = "linux")]
109mod uring_bigbulk;
110#[cfg(target_os = "linux")]
111mod uring_bigbulk_b2alt;
112#[cfg(target_os = "linux")]
113mod uring_bigbulk_probe;
114#[cfg(target_os = "linux")]
115mod uring_conn;
116#[cfg(target_os = "linux")]
117mod uring_inbox;
118#[cfg(target_os = "linux")]
119mod uring_io;
120#[cfg(target_os = "linux")]
121mod uring_park;
122#[cfg(target_os = "linux")]
123mod uring_reactor;
124
125pub use blocked::{BlockHint, BlockKind};
126pub use lua_wake_bridge::push_lua_wake_key;
127pub use reduce::shard_of as shard_of_key;
128pub use cluster::shard_slot_range;
129pub use exec_slowlog::{SlowlogSub, parse_slowlog_sub};
130pub use kevy_config::NotificationFlags;
131pub use kevy_persist::Fsync;
132pub use kevy_resp::{Argv, ArgvBorrowed, ArgvView, RespVersion};
133pub use kevy_store::Store;
134pub use replica_inbox::{ReplicaApply, ReplicaInboxReceiver, ReplicaInboxSender, replica_inbox_pair};
135pub use replication_gate::ReplicatedApplyGuard;
136pub use route::{Route, XGroupCtx};
137pub use runtime::Runtime;
138
139/// Command-set semantics injected into the runtime. Cloned to every core, so it
140/// must be cheap/stateless to clone.
141pub trait Commands: Clone + Send + 'static {
142    /// Classify how a command is routed across shards.
143    fn route<A: ArgvView + ?Sized>(&self, args: &A) -> Route;
144    /// Execute a full command against one shard's store, returning RESP bytes.
145    fn dispatch<A: ArgvView + ?Sized>(&self, store: &mut Store, args: &A) -> Vec<u8>;
146    /// RESP3 variant of [`Self::dispatch`] — called when the connection
147    /// has negotiated `HELLO 3`. Default: delegate to the RESP2 path
148    /// (the cross-shard forward carries a per-cmd `RespVersion`
149    /// so a V2 client and a V3 client can share the owning shard).
150    fn dispatch_resp3<A: ArgvView + ?Sized>(&self, store: &mut Store, args: &A) -> Vec<u8> {
151        self.dispatch(store, args)
152    }
153    /// Execute a command, appending the RESP reply to `out`. The in-order local
154    /// fast path uses this to write straight into the connection's output buffer
155    /// (no per-command reply `Vec`). Default: delegate to [`dispatch`](Self::dispatch).
156    fn dispatch_into<A: ArgvView + ?Sized>(&self, store: &mut Store, args: &A, out: &mut Vec<u8>) {
157        out.extend_from_slice(&self.dispatch(store, args));
158    }
159    /// RESP3 variant of [`Self::dispatch_into`] — called when the
160    /// connection has negotiated `HELLO 3`. Default: delegate to the
161    /// RESP2 path (so a server that hasn't migrated any replies still
162    /// works correctly with a RESP3 client, per spec). Override per
163    /// command to emit RESP3 shapes (Map / Set / Double / …).
164    fn dispatch_into_resp3<A: ArgvView + ?Sized>(
165        &self,
166        store: &mut Store,
167        args: &A,
168        out: &mut Vec<u8>,
169    ) {
170        self.dispatch_into(store, args, out);
171    }
172    /// Classify a command for keyspace notifications. Returns `Some`
173    /// for write commands that should fire a notification when the
174    /// corresponding flag is enabled; `None` for read-only / no-op /
175    /// not-yet-classified commands (those never publish). Default
176    /// `None` so non-kevy embedders pay nothing.
177    fn notify_class<A: ArgvView + ?Sized>(&self, _args: &A) -> Option<NotifyClass> {
178        None
179    }
180
181    /// Handle `HELLO` — return the new connection protocol version + the
182    /// reply bytes. The runtime applies the new version to the conn
183    /// before scheduling the reply, so a `HELLO 3` ack itself comes out
184    /// shaped as a RESP3 Map (the new protocol is in effect for its own
185    /// reply).
186    ///
187    /// Default: ignore the args, keep `current_proto`, emit a minimal
188    /// RESP2 +OK so embedders that don't care still see a sane reply.
189    /// kevy's own impl in `kevy::KevyCommands` parses the optional
190    /// protover and emits the full server-info shape.
191    fn hello_reply<A: ArgvView + ?Sized>(
192        &self,
193        _args: &A,
194        current_proto: RespVersion,
195    ) -> (RespVersion, Vec<u8>) {
196        (current_proto, b"+OK\r\n".to_vec())
197    }
198    /// Whether this command should close the connection (QUIT).
199    fn is_quit<A: ArgvView + ?Sized>(&self, args: &A) -> bool;
200    /// Whether this command mutates the keyspace (so it must be logged to the AOF).
201    fn is_write<A: ArgvView + ?Sized>(&self, args: &A) -> bool;
202    /// Transaction-control classification (MULTI/EXEC/DISCARD vs anything else).
203    fn txn_kind<A: ArgvView + ?Sized>(&self, args: &A) -> TxnKind;
204    /// Called once per shard, immediately after [`Store::new`], before the
205    /// reactor enters its event loop. Implementations install per-shard
206    /// configuration that the runtime doesn't know about — currently the
207    /// `maxmemory` + eviction-policy pair, which kevy ships via its own
208    /// process-wide config snapshot. Default: no-op so non-kevy embedders
209    /// aren't forced to override.
210    fn on_shard_init(&self, _store: &mut Store) {}
211
212    /// Called once on the shard's own thread, first thing in the reactor
213    /// entry (both reactors), before restore/replay. Implementations that
214    /// need per-shard identity at dispatch time (e.g. kevy's `CLUSTER MYID`
215    /// / `CLUSTER NODES` `myself` flag) stash `shard` in a thread-local here
216    /// — in a thread-per-core runtime the current thread *is* the shard.
217    /// Default: no-op.
218    fn on_shard_start(&self, _shard: usize) {}
219
220    /// Per-tick persistence-stats publication: whether this shard has a
221    /// background save/rewrite in flight and how many AOF rewrites have
222    /// completed since open. Command layers that serve `INFO persistence`
223    /// stash these in a thread-local (thread-per-core: the answering
224    /// thread *is* the shard, same pattern as [`Self::on_shard_start`]).
225    /// Default: no-op.
226    fn on_persist_stats(&self, _in_flight: bool, _aof_rewrites_total: u64) {}
227
228    /// Per-tick replication-view publication: the answering shard's
229    /// current `master_repl_offset` (== `ReplicationSource::next_offset()`)
230    /// plus the per-replica `(ipv4, port, sent_offset)` triple for
231    /// every handshake-complete replica (in `AckSent`, `Streaming`,
232    /// or `SnapshotShipping`). `connected_slaves` for `INFO` /
233    /// `ROLE` is derived as `replicas.len()`.
234    /// Only called when this shard has a `ReplicationSource`
235    /// installed (i.e. `Runtime::with_replication(true, ...)` was
236    /// requested); standalone setups pay nothing. Command layers
237    /// that serve `ROLE` / `INFO replication` stash the values in a
238    /// thread-local (thread-per-core: the answering thread *is* the
239    /// shard, same pattern as [`Self::on_persist_stats`]). Default
240    /// no-op.
241    fn on_replication_view(
242        &self,
243        _master_repl_offset: u64,
244        _replicas: Vec<(std::net::Ipv4Addr, u16, u64)>,
245    ) {}
246
247    /// Periodic shard housekeeping (the equivalent of Redis's `serverCron`).
248    /// kevy uses this to run [`Store::tick_expire`] at the configured
249    /// `[expiry].hz`. Default no-op so non-kevy embedders / runtimes can
250    /// ignore it.
251    fn on_shard_tick(&self, _store: &mut Store) {}
252
253    /// Called once per client command at dispatch entry (before routing /
254    /// fan-out, so a multi-key command counts once). kevy uses it for
255    /// `INFO stats: total_commands_processed`. Hot path — keep it to a single
256    /// thread-local bump. Default no-op so non-kevy embedders pay nothing.
257    fn on_command(&self) {}
258
259    /// Called once per accepted client connection. kevy uses it for
260    /// `INFO stats: total_connections_received`. Default no-op.
261    fn on_connection(&self) {}
262
263    /// Interval between [`Self::on_shard_tick`] calls. Default 100 ms
264    /// (matching Redis's `hz = 10`). `0` disables ticking entirely.
265    fn shard_tick_interval_ms(&self) -> u64 {
266        100
267    }
268
269    /// Snapshot of the runtime-owned knobs that can be hot-modified
270    /// (the kevy server wires this to `CONFIG SET`). Called once per
271    /// shard tick — each `Some` value is applied to the shard's live
272    /// state; each `None` keeps the existing setting untouched.
273    ///
274    /// Default returns all-None so embedders that never hot-swap config
275    /// pay nothing beyond one struct-build per tick. The cost lives in
276    /// the impl's read of its own config source.
277    fn live_runtime_config(&self) -> LiveRuntimeConfig {
278        LiveRuntimeConfig::default()
279    }
280
281    /// Index into `args` of the key whose write may wake a blocked waiter
282    /// (`LPUSH` / `RPUSH` feed `BLPOP` / `BRPOP`; `XADD` feeds the stream
283    /// blocks). `Some(1)` for those verbs, `None` for everything else. The
284    /// in-shard fast path reads this off [`ResolvedCmd::wake_idx`]; the
285    /// cross-shard write path (`exec_op`, where a forwarded write
286    /// lands on the key's owning shard) re-derives it via this method since
287    /// the forwarded envelope doesn't carry the resolved hint. Default
288    /// `None` so non-blocking embedders pay nothing.
289    fn wake_idx<A: ArgvView + ?Sized>(&self, _args: &A) -> Option<u8> {
290        None
291    }
292
293    /// Classify a command for blocking semantics. `BlockHint::None`
294    /// (default) is the zero-cost answer for every non-blocking verb;
295    /// the dispatcher only registers a waiter when this returns
296    /// `BlockHint::Block` *and* the command's `dispatch_into` produced no
297    /// reply (i.e. it could not satisfy itself immediately — e.g. BLPOP
298    /// on an empty list). Concrete impls should fold this into their
299    /// override of [`Self::resolve`] so the verb-table lookup happens
300    /// once per command.
301    fn block_hint<A: ArgvView + ?Sized>(&self, _args: &A) -> BlockHint {
302        BlockHint::None
303    }
304
305    /// Rewrite `args` into the owned [`Argv`] that the dispatcher will
306    /// store as the parked waiter's command and replay on wake. Lets a
307    /// command set normalise positional ID / cursor arguments that would
308    /// otherwise re-resolve to a different value on retry — most notably
309    /// `XREAD BLOCK ... STREAMS k $`, where leaving `$` literal in the
310    /// retried argv causes a fresh re-resolve to the post-`XADD` last_id
311    /// and zero matching entries (the wake hangs).
312    ///
313    /// Default: just materialise the argv unchanged. Concrete impls only
314    /// need to override when a registered command carries an arg whose
315    /// meaning depends on store state at park time (`XREAD $`, the
316    /// classic case).
317    ///
318    /// For the cross-shard arbiter this runs on the **target** shard (the
319    /// one that owns the key) when the waiter is armed, so `$` snapshots
320    /// the target's real `last_id` — not the origin shard's (which may not
321    /// hold the stream at all).
322    fn resolve_block_argv<A: ArgvView + ?Sized>(
323        &self,
324        _store: &mut Store,
325        args: &A,
326        _kind: BlockKind,
327    ) -> Argv {
328        args.to_argv()
329    }
330
331    /// Build the **single-key** command the dispatcher will replay to
332    /// satisfy one watched `key` of a (possibly multi-key) blocking
333    /// command. `args` is the original command; `key` is one of its
334    /// watched keys. Returns an [`Argv`] that, when dispatched, pops /
335    /// reads only `key` — e.g. `BLPOP k1 k2 0` watching `k2` yields
336    /// `BLPOP k2 0`; `XREAD … STREAMS s1 s2 id1 id2` watching `s2`
337    /// yields `XREAD … STREAMS s2 id2`.
338    ///
339    /// Any state-dependent positional arg (`$`) is left **literal** here —
340    /// it's frozen later by [`Self::resolve_block_argv`] on the key's
341    /// owning shard. No store access needed (pure argv slicing). Default:
342    /// the unchanged argv (single-key blocking commands need no rewrite).
343    fn block_serve_argv<A: ArgvView + ?Sized>(
344        &self,
345        args: &A,
346        _kind: BlockKind,
347        _key: &[u8],
348    ) -> Argv {
349        args.to_argv()
350    }
351
352    /// Non-destructive readiness peek for a parked waiter: would replaying
353    /// `serve_argv` (built by [`Self::block_serve_argv`], `$` already
354    /// frozen) produce a reply right now? Runs on the key's owning shard
355    /// when arming and is the gate for emitting a cross-shard wake. Must
356    /// NOT mutate the store (no pop / no group-cursor advance). Default
357    /// `false` so non-blocking embedders never spuriously wake.
358    fn block_ready<A: ArgvView + ?Sized>(
359        &self,
360        _store: &mut Store,
361        _serve_argv: &A,
362        _kind: BlockKind,
363    ) -> bool {
364        false
365    }
366
367    /// Resolve all verb-dependent attributes in **one** verb-table lookup.
368    /// The default implementation calls the per-attribute methods above
369    /// (five upper_verb scans + matches); concrete impls SHOULD override
370    /// this with a single match so the reactor's hot path pays the verb-
371    /// resolution cost only once per command.
372    fn resolve<A: ArgvView + ?Sized>(&self, args: &A) -> ResolvedCmd {
373        ResolvedCmd {
374            txn_kind: self.txn_kind(args),
375            route: self.route(args),
376            is_quit: self.is_quit(args),
377            is_write: self.is_write(args),
378            block_hint: self.block_hint(args),
379            wake_idx: None,
380        }
381    }
382}
383
384/// Per-command verb-resolution result. Produced once by [`Commands::resolve`]
385/// in the reactor's parse-then-dispatch loop, reused for routing decisions,
386/// AOF logging, and the QUIT branch — so the per-cmd `upper_verb` cost goes
387/// from 4× down to 1×.
388pub struct ResolvedCmd {
389    pub txn_kind: TxnKind,
390    pub route: Route,
391    pub is_quit: bool,
392    pub is_write: bool,
393    /// Blocking-command classification (see [`Commands::block_hint`]).
394    /// `BlockHint::None` for every non-blocking verb.
395    pub block_hint: BlockHint,
396    /// Index into `args` whose write may wake a `BLPOP` / `XREAD BLOCK`
397    /// waiter parked on that key — `Some(1)` for `LPUSH` / `RPUSH` /
398    /// `XADD`, `None` for every other command (including reads). The
399    /// dispatcher's wake hook is gated on both this being `Some` *and*
400    /// the per-shard `BlockedClients` registry being non-empty, so the
401    /// steady-state cost when nobody is parked is one `is_empty()` check.
402    pub wake_idx: Option<u8>,
403}
404
405/// Keyspace-notification event class — what category a write command
406/// belongs to, so the runtime can match it against the per-conn
407/// notify_keyspace_events flags before publishing.
408#[derive(Debug, Clone, Copy, PartialEq, Eq)]
409pub enum NotifyClass {
410    /// `g` — generic key commands (DEL / EXPIRE / PERSIST / RENAME / TYPE).
411    Generic,
412    /// `$` — string commands (SET / GETSET / INCR / APPEND / MSET).
413    String,
414    /// `l` — list commands (LPUSH / RPUSH / LPOP / LREM / LTRIM / …).
415    List,
416    /// `s` — set commands (SADD / SREM / SPOP / …).
417    Set,
418    /// `h` — hash commands (HSET / HDEL / HINCRBY / …).
419    Hash,
420    /// `z` — sorted-set commands (ZADD / ZREM / ZINCRBY / …).
421    Zset,
422    /// `t` — stream commands (XADD / XDEL / XTRIM / XGROUP / XACK /
423    /// XCLAIM / XREADGROUP / …). Matches Redis's `t` class.
424    Stream,
425}
426
427impl NotifyClass {
428    /// Whether `flags` enables this event class.
429    #[inline]
430    pub fn enabled_in(self, flags: &NotificationFlags) -> bool {
431        match self {
432            NotifyClass::Generic => flags.generic,
433            NotifyClass::String => flags.string,
434            NotifyClass::List => flags.list,
435            NotifyClass::Set => flags.set,
436            NotifyClass::Hash => flags.hash,
437            NotifyClass::Zset => flags.zset,
438            NotifyClass::Stream => flags.stream,
439        }
440    }
441}
442
443/// Transaction-control classification for a command.
444pub enum TxnKind {
445    Multi,
446    Exec,
447    Discard,
448    /// `WATCH` — outside MULTI runs the fan-out; inside MULTI is rejected
449    /// with an error (Redis semantics: `WATCH inside MULTI is not allowed`).
450    /// `UNWATCH` is plain [`Self::Other`] — outside MULTI it routes to
451    /// [`Route::Unwatch`] (clear + OK); inside MULTI it queues as a no-op
452    /// that dispatch resolves to +OK at EXEC time.
453    Watch,
454    Other,
455}
456
457/// Live snapshot of the runtime-owned knobs that may have been changed
458/// since this shard's last tick. Built by the [`Commands`] impl from
459/// its own config source (e.g. kevy reads `config_global`). Each
460/// `Some(_)` is applied to the shard; each `None` leaves the existing
461/// setting alone.
462///
463/// One snapshot is built per tick (every 100 ms by default), so its
464/// cost is amortised across thousands of commands.
465#[derive(Debug, Default, Clone, Copy)]
466pub struct LiveRuntimeConfig {
467    /// AOF fsync policy. Applied via `Aof::set_fsync` — switching to
468    /// `Always` mid-flight also flushes any buffered bytes so the new
469    /// "every write is on disk before reply" contract is honoured from
470    /// the next append onward.
471    pub appendfsync: Option<Fsync>,
472    /// `auto_aof_rewrite_percentage`. `0` disables the auto-trigger.
473    pub auto_aof_rewrite_pct: Option<u32>,
474    /// `auto_aof_rewrite_min_size` in bytes.
475    pub auto_aof_rewrite_min_size: Option<u64>,
476    /// New tick interval in ms (`1000/hz`). `0` disables ticking
477    /// entirely — note that disabling also turns off active TTL
478    /// expiry and the auto-rewrite tick path. Lazy expiry on access
479    /// always still works.
480    pub tick_interval_ms: Option<u64>,
481    /// `notify_keyspace_events` flags. Parsed by the [`Commands`]
482    /// impl from its config source (e.g. kevy reads
483    /// `config_global` + [`kevy_config::parse_notification_flags`]).
484    /// Default-empty flags mean OFF — writes pay one bool-OR check
485    /// and skip every per-key keyspace notification publish.
486    pub notify_flags: Option<NotificationFlags>,
487    /// `[slowlog].slower_than_micros` — `-1` disables, `0` records all,
488    /// `>0` is the strict micros threshold. `None` keeps the existing
489    /// shard setting (set by the [`Runtime`] builder at startup).
490    pub slowlog_slower_than_micros: Option<i64>,
491    /// `[slowlog].max_len` — ring cap per shard. Shrinking trims the
492    /// oldest entries on the next tick application.
493    pub slowlog_max_len: Option<u32>,
494}