skeg_server/lib.rs
1#![deny(unsafe_code)]
2// Server::bind_* funcs take many tuning knobs (workers, mmap flags,
3// shard count, ...). Wrapping them in a config struct would push the
4// complexity to every call site without a real win.
5#![allow(clippy::too_many_arguments)]
6
7//! `skeg-server` - TCP server library.
8//!
9//! Ships single-tenant by default. A separate crate (see the
10//! `tenant` module docs) can install a multi-tenant layer at runtime
11//! via [`Server::with_tenant_backend`].
12
13pub mod handler;
14pub mod resp3_handler;
15pub mod shard;
16pub mod tenant;
17
18use std::net::SocketAddr;
19use std::path::Path;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::net::{TcpListener, TcpStream};
23use tracing::{info, warn};
24
25use handler::handle_connection;
26use resp3_handler::handle_connection_resp3;
27use shard::ShardSet;
28use skeg_vector::QuantKind;
29pub use tenant::{AnonymousPolicy, TenantBackend, TenantId};
30
31pub struct Server {
32 listener: TcpListener,
33 shards: ShardSet,
34 /// Optional multi-tenant backend. `None` keeps single-tenant
35 /// semantics; wiring an `Arc<dyn TenantBackend>` enables RESP3
36 /// AUTH + per-tenant key scoping on this listener.
37 tenant_backend: Option<Arc<dyn TenantBackend>>,
38}
39
40impl Server {
41 /// Bind the server to `addr` with data sharded under `data_dir`.
42 ///
43 /// The shard count equals the number of performance cores.
44 ///
45 /// # Errors
46 ///
47 /// Returns an error if the address cannot be bound or a shard cannot start.
48 pub async fn bind(
49 addr: impl tokio::net::ToSocketAddrs,
50 data_dir: &Path,
51 ) -> std::io::Result<Self> {
52 let n_shards = skeg_platform::num_performance_cores();
53 Self::bind_full(addr, data_dir, n_shards, 0, false).await
54 }
55
56 /// Bind the server with an explicit shard count and worker-pool size.
57 ///
58 /// `workers == 0` (default) keeps VSEARCH inline on the shard thread
59 /// (Personal AI default). `workers > 0` dispatches VSEARCH to a tokio
60 /// blocking pool so KV ops do not queue behind multi-ms vector searches
61 /// (multi-tenant pattern, opt-in via `--workers N` on the CLI).
62 ///
63 /// # Errors
64 ///
65 /// Returns an error if the address cannot be bound or a shard cannot start.
66 pub async fn bind_with_shards(
67 addr: impl tokio::net::ToSocketAddrs,
68 data_dir: &Path,
69 n_shards: usize,
70 workers: usize,
71 ) -> std::io::Result<Self> {
72 Self::bind_full(addr, data_dir, n_shards, workers, false).await
73 }
74
75 /// Full-knob constructor for the read-write path: shard count, worker
76 /// pool, and the opt-in `mmap_tier` flag (see `--tier-mmap` in the
77 /// server CLI). Other entry points delegate here with defaults.
78 ///
79 /// # Errors
80 ///
81 /// Returns an error if the address cannot be bound or a shard cannot start.
82 pub async fn bind_full(
83 addr: impl tokio::net::ToSocketAddrs,
84 data_dir: &Path,
85 n_shards: usize,
86 workers: usize,
87 mmap_tier: bool,
88 ) -> std::io::Result<Self> {
89 Self::bind_full_mmap(addr, data_dir, n_shards, workers, mmap_tier, false).await
90 }
91
92 /// All-knobs constructor for the read-write path. Adds `mmap_graph`
93 /// to [`bind_full`](Self::bind_full).
94 ///
95 /// # Errors
96 ///
97 /// Returns an error if the address cannot be bound or a shard cannot start.
98 pub async fn bind_full_mmap(
99 addr: impl tokio::net::ToSocketAddrs,
100 data_dir: &Path,
101 n_shards: usize,
102 workers: usize,
103 mmap_tier: bool,
104 mmap_graph: bool,
105 ) -> std::io::Result<Self> {
106 let listener = TcpListener::bind(addr).await?;
107 let shards = ShardSet::open_mode_full_mmap(
108 data_dir,
109 n_shards,
110 false,
111 QuantKind::Int8,
112 workers,
113 mmap_tier,
114 mmap_graph,
115 )?;
116 Ok(Self {
117 listener,
118 shards,
119 tenant_backend: None,
120 })
121 }
122
123 /// Install a multi-tenant backend on a server already built by one
124 /// of the `bind*` constructors. Builder-style for clarity at the
125 /// call site. When set, the RESP3 handler honours `HELLO 3 AUTH`
126 /// and scopes KV / vector ops by tenant id.
127 #[must_use]
128 pub fn with_tenant_backend(mut self, backend: Arc<dyn TenantBackend>) -> Self {
129 self.tenant_backend = Some(backend);
130 self
131 }
132
133 /// Bind the server in serve mode: a single shard over the offline-built
134 /// index at `data_dir`, read-only. Every mutation (KV and vector) is
135 /// rejected; the index is served at its clean resident footprint.
136 ///
137 /// `data_dir` is a directory produced by `skeg-tool build`. `tier` is the
138 /// tier-1 quantisation built for the served index: `QuantKind::Int8` or
139 /// `QuantKind::Pq { m, k }` (smaller footprint). `workers > 0` enables
140 /// the VSEARCH dispatch pool described in [`bind_with_shards`].
141 ///
142 /// # Errors
143 ///
144 /// Returns an error if the address cannot be bound or the shard cannot
145 /// start.
146 pub async fn bind_serve(
147 addr: impl tokio::net::ToSocketAddrs,
148 data_dir: &Path,
149 tier: QuantKind,
150 workers: usize,
151 ) -> std::io::Result<Self> {
152 Self::bind_serve_full(addr, data_dir, tier, workers, false).await
153 }
154
155 /// Full-knob serve mode: like [`bind_serve`] plus the opt-in
156 /// `mmap_tier` flag that swaps the TurboQuant codes for a
157 /// memory-mapped view of `tier.cache.bin` at open.
158 ///
159 /// # Errors
160 ///
161 /// Returns an error if the address cannot be bound or the shard cannot start.
162 pub async fn bind_serve_full(
163 addr: impl tokio::net::ToSocketAddrs,
164 data_dir: &Path,
165 tier: QuantKind,
166 workers: usize,
167 mmap_tier: bool,
168 ) -> std::io::Result<Self> {
169 Self::bind_serve_full_mmap(addr, data_dir, tier, workers, mmap_tier, false).await
170 }
171
172 /// All-knobs serve mode. Adds `mmap_graph` to
173 /// [`bind_serve_full`](Self::bind_serve_full).
174 ///
175 /// # Errors
176 ///
177 /// Returns an error if the address cannot be bound or the shard cannot start.
178 pub async fn bind_serve_full_mmap(
179 addr: impl tokio::net::ToSocketAddrs,
180 data_dir: &Path,
181 tier: QuantKind,
182 workers: usize,
183 mmap_tier: bool,
184 mmap_graph: bool,
185 ) -> std::io::Result<Self> {
186 let listener = TcpListener::bind(addr).await?;
187 let shards =
188 ShardSet::open_mode_full_mmap(data_dir, 1, true, tier, workers, mmap_tier, mmap_graph)?;
189 Ok(Self {
190 listener,
191 shards,
192 tenant_backend: None,
193 })
194 }
195
196 /// Return the local address the server is listening on.
197 ///
198 /// # Errors
199 ///
200 /// Returns an error if the OS cannot retrieve the socket address.
201 pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
202 self.listener.local_addr()
203 }
204
205 /// Number of shards backing this server.
206 #[must_use]
207 pub fn n_shards(&self) -> usize {
208 self.shards.n_shards()
209 }
210
211 /// Accept connections and handle them until an I/O error on `accept`.
212 ///
213 /// # Errors
214 ///
215 /// Returns the first error from `TcpListener::accept`.
216 pub async fn run(self) -> std::io::Result<()> {
217 let Self {
218 listener,
219 shards,
220 tenant_backend: _,
221 } = self;
222 info!(addr = ?listener.local_addr()?, n_shards = shards.n_shards(), "server listening (binary protocol)");
223 loop {
224 let (stream, _) = listener.accept().await?;
225 tune_socket(&stream);
226 let shards = shards.clone();
227 tokio::spawn(async move {
228 handle_connection(stream, shards).await;
229 });
230 }
231 }
232
233 /// Like `run`, but speaks RESP3 (Redis wire) on the listener instead of
234 /// the skeg binary protocol. Same shard set, same storage, different
235 /// encoding.
236 ///
237 /// # Errors
238 ///
239 /// Returns the first error from `TcpListener::accept`.
240 pub async fn run_resp3(self) -> std::io::Result<()> {
241 let Self {
242 listener,
243 shards,
244 tenant_backend,
245 } = self;
246 info!(
247 addr = ?listener.local_addr()?,
248 n_shards = shards.n_shards(),
249 tenant = tenant_backend.is_some(),
250 "server listening (RESP3)"
251 );
252 loop {
253 let (stream, _) = listener.accept().await?;
254 tune_socket(&stream);
255 let shards = shards.clone();
256 let backend = tenant_backend.clone();
257 tokio::spawn(async move {
258 handle_connection_resp3(stream, shards, backend).await;
259 });
260 }
261 }
262}
263
264/// Apply per-connection socket tuning: `TCP_NODELAY` for low-latency
265/// request/reply traffic, and `SO_KEEPALIVE` + `TCP_KEEPIDLE` so a
266/// half-open connection (peer dropped without FIN) gets detected
267/// before the OS default of ~2h. Failures here are logged and
268/// swallowed because they don't prevent the connection from working
269/// (they just degrade tail-case behaviour).
270fn tune_socket(stream: &TcpStream) {
271 if let Err(e) = stream.set_nodelay(true) {
272 warn!("set_nodelay failed: {e}");
273 }
274 let sock = socket2::SockRef::from(stream);
275 let ka = socket2::TcpKeepalive::new()
276 .with_time(Duration::from_secs(60))
277 .with_interval(Duration::from_secs(10));
278 if let Err(e) = sock.set_tcp_keepalive(&ka) {
279 warn!("set_tcp_keepalive failed: {e}");
280 }
281}