Skip to main content

skeg_server/
lib.rs

1#![deny(unsafe_code)]
2// Server::bind_* funcs take many tuning knobs (workers, mmap flags,
3// shard count, ...). Wrapping them in a config struct would push the
4// complexity to every call site without a real win.
5#![allow(clippy::too_many_arguments)]
6
7//! `skeg-server` - TCP server library.
8//!
9//! Ships single-tenant by default. A separate crate (see the
10//! `tenant` module docs) can install a multi-tenant layer at runtime
11//! via [`Server::with_tenant_backend`].
12
13pub mod handler;
14pub mod resp3_handler;
15pub mod shard;
16pub mod tenant;
17
18use std::net::SocketAddr;
19use std::path::Path;
20use std::sync::Arc;
21use std::time::Duration;
22use tokio::net::{TcpListener, TcpStream};
23use tracing::{info, warn};
24
25use handler::handle_connection;
26use resp3_handler::handle_connection_resp3;
27use shard::ShardSet;
28use skeg_vector::QuantKind;
29pub use tenant::{AnonymousPolicy, TenantBackend, TenantId};
30
31pub struct Server {
32    listener: TcpListener,
33    shards: ShardSet,
34    /// Optional multi-tenant backend. `None` keeps single-tenant
35    /// semantics; wiring an `Arc<dyn TenantBackend>` enables RESP3
36    /// AUTH + per-tenant key scoping on this listener.
37    tenant_backend: Option<Arc<dyn TenantBackend>>,
38}
39
40impl Server {
41    /// Bind the server to `addr` with data sharded under `data_dir`.
42    ///
43    /// The shard count equals the number of performance cores.
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if the address cannot be bound or a shard cannot start.
48    pub async fn bind(
49        addr: impl tokio::net::ToSocketAddrs,
50        data_dir: &Path,
51    ) -> std::io::Result<Self> {
52        let n_shards = skeg_platform::num_performance_cores();
53        Self::bind_full(addr, data_dir, n_shards, 0, false).await
54    }
55
56    /// Bind the server with an explicit shard count and worker-pool size.
57    ///
58    /// `workers == 0` (default) keeps VSEARCH inline on the shard thread
59    /// (Personal AI default). `workers > 0` dispatches VSEARCH to a tokio
60    /// blocking pool so KV ops do not queue behind multi-ms vector searches
61    /// (multi-tenant pattern, opt-in via `--workers N` on the CLI).
62    ///
63    /// # Errors
64    ///
65    /// Returns an error if the address cannot be bound or a shard cannot start.
66    pub async fn bind_with_shards(
67        addr: impl tokio::net::ToSocketAddrs,
68        data_dir: &Path,
69        n_shards: usize,
70        workers: usize,
71    ) -> std::io::Result<Self> {
72        Self::bind_full(addr, data_dir, n_shards, workers, false).await
73    }
74
75    /// Full-knob constructor for the read-write path: shard count, worker
76    /// pool, and the opt-in `mmap_tier` flag (see `--tier-mmap` in the
77    /// server CLI). Other entry points delegate here with defaults.
78    ///
79    /// # Errors
80    ///
81    /// Returns an error if the address cannot be bound or a shard cannot start.
82    pub async fn bind_full(
83        addr: impl tokio::net::ToSocketAddrs,
84        data_dir: &Path,
85        n_shards: usize,
86        workers: usize,
87        mmap_tier: bool,
88    ) -> std::io::Result<Self> {
89        Self::bind_full_mmap(addr, data_dir, n_shards, workers, mmap_tier, false).await
90    }
91
92    /// All-knobs constructor for the read-write path. Adds `mmap_graph`
93    /// to [`bind_full`](Self::bind_full).
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the address cannot be bound or a shard cannot start.
98    pub async fn bind_full_mmap(
99        addr: impl tokio::net::ToSocketAddrs,
100        data_dir: &Path,
101        n_shards: usize,
102        workers: usize,
103        mmap_tier: bool,
104        mmap_graph: bool,
105    ) -> std::io::Result<Self> {
106        let listener = TcpListener::bind(addr).await?;
107        let shards = ShardSet::open_mode_full_mmap(
108            data_dir,
109            n_shards,
110            false,
111            QuantKind::Int8,
112            workers,
113            mmap_tier,
114            mmap_graph,
115        )?;
116        Ok(Self {
117            listener,
118            shards,
119            tenant_backend: None,
120        })
121    }
122
123    /// Install a multi-tenant backend on a server already built by one
124    /// of the `bind*` constructors. Builder-style for clarity at the
125    /// call site. When set, the RESP3 handler honours `HELLO 3 AUTH`
126    /// and scopes KV / vector ops by tenant id.
127    #[must_use]
128    pub fn with_tenant_backend(mut self, backend: Arc<dyn TenantBackend>) -> Self {
129        self.tenant_backend = Some(backend);
130        self
131    }
132
133    /// Bind the server in serve mode: a single shard over the offline-built
134    /// index at `data_dir`, read-only. Every mutation (KV and vector) is
135    /// rejected; the index is served at its clean resident footprint.
136    ///
137    /// `data_dir` is a directory produced by `skeg-tool build`. `tier` is the
138    /// tier-1 quantisation built for the served index: `QuantKind::Int8` or
139    /// `QuantKind::Pq { m, k }` (smaller footprint). `workers > 0` enables
140    /// the VSEARCH dispatch pool described in [`bind_with_shards`].
141    ///
142    /// # Errors
143    ///
144    /// Returns an error if the address cannot be bound or the shard cannot
145    /// start.
146    pub async fn bind_serve(
147        addr: impl tokio::net::ToSocketAddrs,
148        data_dir: &Path,
149        tier: QuantKind,
150        workers: usize,
151    ) -> std::io::Result<Self> {
152        Self::bind_serve_full(addr, data_dir, tier, workers, false).await
153    }
154
155    /// Full-knob serve mode: like [`bind_serve`] plus the opt-in
156    /// `mmap_tier` flag that swaps the TurboQuant codes for a
157    /// memory-mapped view of `tier.cache.bin` at open.
158    ///
159    /// # Errors
160    ///
161    /// Returns an error if the address cannot be bound or the shard cannot start.
162    pub async fn bind_serve_full(
163        addr: impl tokio::net::ToSocketAddrs,
164        data_dir: &Path,
165        tier: QuantKind,
166        workers: usize,
167        mmap_tier: bool,
168    ) -> std::io::Result<Self> {
169        Self::bind_serve_full_mmap(addr, data_dir, tier, workers, mmap_tier, false).await
170    }
171
172    /// All-knobs serve mode. Adds `mmap_graph` to
173    /// [`bind_serve_full`](Self::bind_serve_full).
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if the address cannot be bound or the shard cannot start.
178    pub async fn bind_serve_full_mmap(
179        addr: impl tokio::net::ToSocketAddrs,
180        data_dir: &Path,
181        tier: QuantKind,
182        workers: usize,
183        mmap_tier: bool,
184        mmap_graph: bool,
185    ) -> std::io::Result<Self> {
186        let listener = TcpListener::bind(addr).await?;
187        let shards =
188            ShardSet::open_mode_full_mmap(data_dir, 1, true, tier, workers, mmap_tier, mmap_graph)?;
189        Ok(Self {
190            listener,
191            shards,
192            tenant_backend: None,
193        })
194    }
195
196    /// Return the local address the server is listening on.
197    ///
198    /// # Errors
199    ///
200    /// Returns an error if the OS cannot retrieve the socket address.
201    pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
202        self.listener.local_addr()
203    }
204
205    /// Number of shards backing this server.
206    #[must_use]
207    pub fn n_shards(&self) -> usize {
208        self.shards.n_shards()
209    }
210
211    /// Accept connections and handle them until an I/O error on `accept`.
212    ///
213    /// # Errors
214    ///
215    /// Returns the first error from `TcpListener::accept`.
216    pub async fn run(self) -> std::io::Result<()> {
217        let Self {
218            listener,
219            shards,
220            tenant_backend: _,
221        } = self;
222        info!(addr = ?listener.local_addr()?, n_shards = shards.n_shards(), "server listening (binary protocol)");
223        loop {
224            let (stream, _) = listener.accept().await?;
225            tune_socket(&stream);
226            let shards = shards.clone();
227            tokio::spawn(async move {
228                handle_connection(stream, shards).await;
229            });
230        }
231    }
232
233    /// Like `run`, but speaks RESP3 (Redis wire) on the listener instead of
234    /// the skeg binary protocol. Same shard set, same storage, different
235    /// encoding.
236    ///
237    /// # Errors
238    ///
239    /// Returns the first error from `TcpListener::accept`.
240    pub async fn run_resp3(self) -> std::io::Result<()> {
241        let Self {
242            listener,
243            shards,
244            tenant_backend,
245        } = self;
246        info!(
247            addr = ?listener.local_addr()?,
248            n_shards = shards.n_shards(),
249            tenant = tenant_backend.is_some(),
250            "server listening (RESP3)"
251        );
252        loop {
253            let (stream, _) = listener.accept().await?;
254            tune_socket(&stream);
255            let shards = shards.clone();
256            let backend = tenant_backend.clone();
257            tokio::spawn(async move {
258                handle_connection_resp3(stream, shards, backend).await;
259            });
260        }
261    }
262}
263
264/// Apply per-connection socket tuning: `TCP_NODELAY` for low-latency
265/// request/reply traffic, and `SO_KEEPALIVE` + `TCP_KEEPIDLE` so a
266/// half-open connection (peer dropped without FIN) gets detected
267/// before the OS default of ~2h. Failures here are logged and
268/// swallowed because they don't prevent the connection from working
269/// (they just degrade tail-case behaviour).
270fn tune_socket(stream: &TcpStream) {
271    if let Err(e) = stream.set_nodelay(true) {
272        warn!("set_nodelay failed: {e}");
273    }
274    let sock = socket2::SockRef::from(stream);
275    let ka = socket2::TcpKeepalive::new()
276        .with_time(Duration::from_secs(60))
277        .with_interval(Duration::from_secs(10));
278    if let Err(e) = sock.set_tcp_keepalive(&ka) {
279        warn!("set_tcp_keepalive failed: {e}");
280    }
281}