skeg_server/lib.rs
1#![deny(unsafe_code)]
2// Server::bind_* funcs take many tuning knobs (workers, mmap flags,
3// shard count, ...). Wrapping them in a config struct would push the
4// complexity to every call site without a real win.
5#![allow(clippy::too_many_arguments)]
6
7//! `skeg-server` - TCP server library.
8//!
9//! Ships single-tenant by default. A separate crate (see the
10//! `tenant` module docs) can install a multi-tenant layer at runtime
11//! via [`Server::with_tenant_backend`].
12
13pub mod handler;
14pub mod quota;
15pub mod resp3_handler;
16pub mod shard;
17pub mod tenant;
18#[cfg(feature = "tracing-otlp")]
19pub mod tracing_otlp;
20
21use std::net::SocketAddr;
22use std::path::Path;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::net::{TcpListener, TcpStream};
26use tracing::{info, warn};
27
28use handler::handle_connection;
29pub use quota::{TenantLimits, TenantVectorQuota};
30use resp3_handler::handle_connection_resp3;
31use shard::ShardSet;
32use skeg_vector::QuantKind;
33pub use tenant::{AnonymousPolicy, QuotaAdminError, TenantBackend, TenantId};
34
35pub struct Server {
36 listener: TcpListener,
37 shards: ShardSet,
38 /// Optional multi-tenant backend. `None` keeps single-tenant
39 /// semantics; wiring an `Arc<dyn TenantBackend>` enables RESP3
40 /// AUTH + per-tenant key scoping on this listener.
41 tenant_backend: Option<Arc<dyn TenantBackend>>,
42}
43
44impl Server {
45 /// Bind the server to `addr` with data sharded under `data_dir`.
46 ///
47 /// The shard count equals the number of performance cores.
48 ///
49 /// # Errors
50 ///
51 /// Returns an error if the address cannot be bound or a shard cannot start.
52 pub async fn bind(
53 addr: impl tokio::net::ToSocketAddrs,
54 data_dir: &Path,
55 ) -> std::io::Result<Self> {
56 let n_shards = skeg_platform::num_performance_cores();
57 Self::bind_full(addr, data_dir, n_shards, 0, false).await
58 }
59
60 /// Bind the server with an explicit shard count and worker-pool size.
61 ///
62 /// `workers == 0` (default) keeps VSEARCH inline on the shard thread
63 /// (Personal AI default). `workers > 0` dispatches VSEARCH to a tokio
64 /// blocking pool so KV ops do not queue behind multi-ms vector searches
65 /// (multi-tenant pattern, opt-in via `--workers N` on the CLI).
66 ///
67 /// # Errors
68 ///
69 /// Returns an error if the address cannot be bound or a shard cannot start.
70 pub async fn bind_with_shards(
71 addr: impl tokio::net::ToSocketAddrs,
72 data_dir: &Path,
73 n_shards: usize,
74 workers: usize,
75 ) -> std::io::Result<Self> {
76 Self::bind_full(addr, data_dir, n_shards, workers, false).await
77 }
78
79 /// Full-knob constructor for the read-write path: shard count, worker
80 /// pool, and the opt-in `mmap_tier` flag (see `--tier-mmap` in the
81 /// server CLI). Other entry points delegate here with defaults.
82 ///
83 /// # Errors
84 ///
85 /// Returns an error if the address cannot be bound or a shard cannot start.
86 pub async fn bind_full(
87 addr: impl tokio::net::ToSocketAddrs,
88 data_dir: &Path,
89 n_shards: usize,
90 workers: usize,
91 mmap_tier: bool,
92 ) -> std::io::Result<Self> {
93 Self::bind_full_mmap(addr, data_dir, n_shards, workers, mmap_tier, false).await
94 }
95
96 /// All-knobs constructor for the read-write path. Adds `mmap_graph`
97 /// to [`bind_full`](Self::bind_full).
98 ///
99 /// # Errors
100 ///
101 /// Returns an error if the address cannot be bound or a shard cannot start.
102 pub async fn bind_full_mmap(
103 addr: impl tokio::net::ToSocketAddrs,
104 data_dir: &Path,
105 n_shards: usize,
106 workers: usize,
107 mmap_tier: bool,
108 mmap_graph: bool,
109 ) -> std::io::Result<Self> {
110 let listener = TcpListener::bind(addr).await?;
111 let shards = ShardSet::open_mode_full_mmap(
112 data_dir,
113 n_shards,
114 false,
115 QuantKind::Int8,
116 workers,
117 mmap_tier,
118 mmap_graph,
119 )?;
120 Ok(Self {
121 listener,
122 shards,
123 tenant_backend: None,
124 })
125 }
126
127 /// Install a multi-tenant backend on a server already built by one
128 /// of the `bind*` constructors. Builder-style for clarity at the
129 /// call site. When set, the RESP3 handler honours `HELLO 3 AUTH`
130 /// and scopes KV / vector ops by tenant id.
131 #[must_use]
132 pub fn with_tenant_backend(mut self, backend: Arc<dyn TenantBackend>) -> Self {
133 self.tenant_backend = Some(backend);
134 self
135 }
136
137 /// Bind the server in serve mode: a single shard over the offline-built
138 /// index at `data_dir`, read-only. Every mutation (KV and vector) is
139 /// rejected; the index is served at its clean resident footprint.
140 ///
141 /// `data_dir` is a directory produced by `skeg-tool build`. `tier` is the
142 /// tier-1 quantisation built for the served index: `QuantKind::Int8` or
143 /// `QuantKind::Pq { m, k }` (smaller footprint). `workers > 0` enables
144 /// the VSEARCH dispatch pool described in [`bind_with_shards`].
145 ///
146 /// # Errors
147 ///
148 /// Returns an error if the address cannot be bound or the shard cannot
149 /// start.
150 pub async fn bind_serve(
151 addr: impl tokio::net::ToSocketAddrs,
152 data_dir: &Path,
153 tier: QuantKind,
154 workers: usize,
155 ) -> std::io::Result<Self> {
156 Self::bind_serve_full(addr, data_dir, tier, workers, false).await
157 }
158
159 /// Full-knob serve mode: like [`bind_serve`] plus the opt-in
160 /// `mmap_tier` flag that swaps the TurboQuant codes for a
161 /// memory-mapped view of `tier.cache.bin` at open.
162 ///
163 /// # Errors
164 ///
165 /// Returns an error if the address cannot be bound or the shard cannot start.
166 pub async fn bind_serve_full(
167 addr: impl tokio::net::ToSocketAddrs,
168 data_dir: &Path,
169 tier: QuantKind,
170 workers: usize,
171 mmap_tier: bool,
172 ) -> std::io::Result<Self> {
173 Self::bind_serve_full_mmap(addr, data_dir, tier, workers, mmap_tier, false).await
174 }
175
176 /// All-knobs serve mode. Adds `mmap_graph` to
177 /// [`bind_serve_full`](Self::bind_serve_full).
178 ///
179 /// # Errors
180 ///
181 /// Returns an error if the address cannot be bound or the shard cannot start.
182 pub async fn bind_serve_full_mmap(
183 addr: impl tokio::net::ToSocketAddrs,
184 data_dir: &Path,
185 tier: QuantKind,
186 workers: usize,
187 mmap_tier: bool,
188 mmap_graph: bool,
189 ) -> std::io::Result<Self> {
190 let listener = TcpListener::bind(addr).await?;
191 let shards =
192 ShardSet::open_mode_full_mmap(data_dir, 1, true, tier, workers, mmap_tier, mmap_graph)?;
193 Ok(Self {
194 listener,
195 shards,
196 tenant_backend: None,
197 })
198 }
199
200 /// Return the local address the server is listening on.
201 ///
202 /// # Errors
203 ///
204 /// Returns an error if the OS cannot retrieve the socket address.
205 pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
206 self.listener.local_addr()
207 }
208
209 /// Number of shards backing this server.
210 #[must_use]
211 pub fn n_shards(&self) -> usize {
212 self.shards.n_shards()
213 }
214
215 /// Accept connections and handle them until an I/O error on `accept`.
216 ///
217 /// # Errors
218 ///
219 /// Returns the first error from `TcpListener::accept`.
220 pub async fn run(self) -> std::io::Result<()> {
221 let Self {
222 listener,
223 shards,
224 tenant_backend: _,
225 } = self;
226 info!(addr = ?listener.local_addr()?, n_shards = shards.n_shards(), "server listening (binary protocol)");
227 loop {
228 let (stream, _) = listener.accept().await?;
229 tune_socket(&stream);
230 let shards = shards.clone();
231 tokio::spawn(async move {
232 handle_connection(stream, shards).await;
233 });
234 }
235 }
236
237 /// Like `run`, but speaks RESP3 (Redis wire) on the listener instead of
238 /// the skeg binary protocol. Same shard set, same storage, different
239 /// encoding.
240 ///
241 /// # Errors
242 ///
243 /// Returns the first error from `TcpListener::accept`.
244 pub async fn run_resp3(self) -> std::io::Result<()> {
245 let Self {
246 listener,
247 shards,
248 tenant_backend,
249 } = self;
250 info!(
251 addr = ?listener.local_addr()?,
252 n_shards = shards.n_shards(),
253 tenant = tenant_backend.is_some(),
254 "server listening (RESP3)"
255 );
256 loop {
257 let (stream, _) = listener.accept().await?;
258 tune_socket(&stream);
259 let shards = shards.clone();
260 let backend = tenant_backend.clone();
261 tokio::spawn(async move {
262 handle_connection_resp3(stream, shards, backend).await;
263 });
264 }
265 }
266}
267
268/// Apply per-connection socket tuning: `TCP_NODELAY` for low-latency
269/// request/reply traffic, and `SO_KEEPALIVE` + `TCP_KEEPIDLE` so a
270/// half-open connection (peer dropped without FIN) gets detected
271/// before the OS default of ~2h. Failures here are logged and
272/// swallowed because they don't prevent the connection from working
273/// (they just degrade tail-case behaviour).
274fn tune_socket(stream: &TcpStream) {
275 if let Err(e) = stream.set_nodelay(true) {
276 warn!("set_nodelay failed: {e}");
277 }
278 let sock = socket2::SockRef::from(stream);
279 let ka = socket2::TcpKeepalive::new()
280 .with_time(Duration::from_secs(60))
281 .with_interval(Duration::from_secs(10));
282 if let Err(e) = sock.set_tcp_keepalive(&ka) {
283 warn!("set_tcp_keepalive failed: {e}");
284 }
285}