kevy_rt/runtime_builders.rs
1//! Runtime builder methods split out of [`crate::runtime`] so that
2//! file stays under the 500-LOC project ceiling. Same `impl Runtime<C>`,
3//! split purely by responsibility: construction + boot live in
4//! `runtime.rs`; the `with_*` configuration setters live here.
5
6use std::path::PathBuf;
7
8use kevy_persist::Fsync;
9
10use crate::Commands;
11use crate::runtime::Runtime;
12
13impl<C: Commands> Runtime<C> {
14 /// v3-cluster replication producer side: when `enabled`, each shard
15 /// runs a per-shard `ReplicationSource` with `buffer_size` byte
16 /// budget. Every applied mutation is pushed to the backlog for
17 /// connected replicas to consume. `enabled = false` (default) is
18 /// zero hot-path cost — each write checks `Option::is_some()` and
19 /// skips. The replication TCP listener / streaming loop arrive in
20 /// subsequent v3-cluster tasks (T1.12+); enabling without those
21 /// landed means the backlog fills and frames are dropped per the
22 /// source's eviction policy, but writes proceed normally.
23 #[must_use]
24 pub fn with_replication(mut self, enabled: bool, buffer_size: u64) -> Self {
25 self.enable_replication = enabled;
26 if buffer_size > 0 {
27 self.replication_buffer_size = buffer_size;
28 }
29 self
30 }
31
32 /// Bring up a replication listener per shard at
33 /// `port_base + shard_id` (per Issue Ledger I2 — mirrors the
34 /// cluster listener pattern). Replica clients connect to each
35 /// per-shard port to mirror the full keyspace. This is independent
36 /// of [`Self::with_replication`]: a primary that runs the producer
37 /// backlog without a listener (benchmarks, embed-only) is
38 /// supported.
39 #[must_use]
40 pub fn with_replication_listener(mut self, port_base: u16) -> Self {
41 self.replication_port_base = Some(port_base);
42 self
43 }
44
45 /// Per-shard SlotTable reconnect window in milliseconds — the
46 /// grace period a disconnected replica's slot is retained for so
47 /// a reconnect within the window can be correlated against its
48 /// prior `sent_offset`. Default `60_000` (60 s); pass `0` to drop
49 /// slots immediately on disconnect.
50 #[must_use]
51 pub fn with_replication_reconnect_window(mut self, ms: u32) -> Self {
52 self.replication_reconnect_window_ms = ms;
53 self
54 }
55
56 /// Install per-shard replica inboxes (T1.29). The embedder pre-
57 /// constructs `nshards` inbox pairs via
58 /// [`crate::replica_inbox_pair`], keeps the senders to hand to
59 /// the per-shard replica runner threads, and passes the receivers
60 /// here. The order of `receivers` is shard-major: index `i` ↔
61 /// shard `i`. Length must equal `nshards`. When this builder
62 /// isn't called, no shard has an inbox (the standalone /
63 /// primary-only behaviour pre-T1.29).
64 #[must_use]
65 pub fn with_replica_inboxes(
66 mut self,
67 receivers: Vec<crate::replica_inbox::ReplicaInboxReceiver>,
68 ) -> Self {
69 self.replica_inboxes = receivers.into_iter().map(Some).collect();
70 self
71 }
72
73 /// Enable single-node cluster mode: keys route by Redis-cluster slot
74 /// (CRC16 `{hashtag}` & 16383, contiguous even ranges) and every shard
75 /// `i` binds a second, deterministic listener at `port_base + i` that
76 /// answers wrong-shard keys with `-MOVED` instead of forwarding. The
77 /// SO_REUSEPORT listener on the main port keeps today's full
78 /// forward-anywhere behaviour for non-cluster clients.
79 #[must_use]
80 pub fn with_cluster(mut self, port_base: u16) -> Self {
81 self.cluster_port_base = Some(port_base);
82 self
83 }
84
85 /// SLOWLOG tuning (`[slowlog]` config section). Default
86 /// `slower_than_micros = -1` (OFF) so the hot path never reads the
87 /// clock — every enabled command otherwise pays an `Instant::now()`
88 /// pair around dispatch, ~30 ns/op (≈9 % at 3 M ops/s). To match
89 /// Redis's 10 ms default, pass `10_000`; `0` records all; `-1`
90 /// disables. `max_len` is the per-shard ring cap (default 128).
91 #[must_use]
92 pub fn with_slowlog(mut self, slower_than_micros: i64, max_len: u32) -> Self {
93 self.slowlog_slower_than_micros = slower_than_micros;
94 self.slowlog_max_len = max_len;
95 self
96 }
97
98 /// Reactor tuning knobs (`[advanced]` config section). Defaults
99 /// match the pre-v1.4 hardcoded constants. `ring_capacity` is
100 /// applied at SPSC ring construction (startup only); the other
101 /// three are read at each iteration of the reactor loop, so
102 /// values applied here take effect from the next shard.run() call.
103 #[must_use]
104 pub fn with_advanced(
105 mut self,
106 spin_limit: u32,
107 park_timeout_ms: u32,
108 tick_check_every: u32,
109 ring_capacity: usize,
110 ) -> Self {
111 self.spin_limit = spin_limit;
112 self.park_timeout_ms = park_timeout_ms;
113 self.tick_check_every = tick_check_every;
114 self.ring_capacity = ring_capacity;
115 self
116 }
117
118 /// Set the directory where shards snapshot to / load from. Default: `.`.
119 #[must_use]
120 pub fn with_data_dir(mut self, dir: impl Into<PathBuf>) -> Self {
121 self.data_dir = dir.into();
122 self
123 }
124
125 /// Enable/disable the append-only log. Default: enabled.
126 #[must_use]
127 pub fn with_aof(mut self, on: bool) -> Self {
128 self.enable_aof = on;
129 self
130 }
131
132 /// fsync policy for the AOF. Default `EverySec` matches Redis (lose at
133 /// most ~1 s of writes on a crash). `Always` is zero-loss but ~50 %
134 /// throughput; `No` defers everything to the OS pagecache.
135 #[must_use]
136 pub fn with_appendfsync(mut self, fsync: Fsync) -> Self {
137 self.appendfsync = fsync;
138 self
139 }
140
141 /// Auto-trigger BGREWRITEAOF when the live AOF has grown by at least
142 /// `pct` percent above its size at the previous rewrite, AND is at
143 /// least `min_size` bytes. `pct=0` disables auto-rewrite (clients can
144 /// still run BGREWRITEAOF manually). Defaults: 100 % / 64 MiB.
145 #[must_use]
146 pub fn with_auto_aof_rewrite(mut self, pct: u32, min_size: u64) -> Self {
147 self.auto_aof_rewrite_pct = pct;
148 self.auto_aof_rewrite_min_size = min_size;
149 self
150 }
151}