Skip to main content

skeg_server/
lib.rs

1#![deny(unsafe_code)]
2// Server::bind_* funcs take many tuning knobs (workers, mmap flags,
3// shard count, ...). Wrapping them in a config struct would push the
4// complexity to every call site without a real win.
5#![allow(clippy::too_many_arguments)]
6
7//! `skeg-server` - TCP server library.
8//!
9//! Ships single-tenant by default. A separate crate (see the
10//! `tenant` module docs) can install a multi-tenant layer at runtime
11//! via [`Server::with_tenant_backend`].
12
13pub mod handler;
14pub mod quota;
15pub mod resp3_handler;
16pub mod shard;
17pub mod tenant;
18#[cfg(feature = "tracing-otlp")]
19pub mod tracing_otlp;
20
21use std::net::SocketAddr;
22use std::path::Path;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::net::{TcpListener, TcpStream};
26use tracing::{info, warn};
27
28use handler::handle_connection;
29pub use quota::{TenantLimits, TenantVectorQuota};
30use resp3_handler::handle_connection_resp3;
31use shard::ShardSet;
32use skeg_vector::QuantKind;
33pub use tenant::{AnonymousPolicy, QuotaAdminError, TenantBackend, TenantId};
34
35pub struct Server {
36    listener: TcpListener,
37    shards: ShardSet,
38    /// Optional multi-tenant backend. `None` keeps single-tenant
39    /// semantics; wiring an `Arc<dyn TenantBackend>` enables RESP3
40    /// AUTH + per-tenant key scoping on this listener.
41    tenant_backend: Option<Arc<dyn TenantBackend>>,
42}
43
44impl Server {
45    /// Bind the server to `addr` with data sharded under `data_dir`.
46    ///
47    /// The shard count equals the number of performance cores.
48    ///
49    /// # Errors
50    ///
51    /// Returns an error if the address cannot be bound or a shard cannot start.
52    pub async fn bind(
53        addr: impl tokio::net::ToSocketAddrs,
54        data_dir: &Path,
55    ) -> std::io::Result<Self> {
56        let n_shards = skeg_platform::num_performance_cores();
57        Self::bind_full(addr, data_dir, n_shards, 0, false).await
58    }
59
60    /// Bind the server with an explicit shard count and worker-pool size.
61    ///
62    /// `workers == 0` (default) keeps VSEARCH inline on the shard thread
63    /// (Personal AI default). `workers > 0` dispatches VSEARCH to a tokio
64    /// blocking pool so KV ops do not queue behind multi-ms vector searches
65    /// (multi-tenant pattern, opt-in via `--workers N` on the CLI).
66    ///
67    /// # Errors
68    ///
69    /// Returns an error if the address cannot be bound or a shard cannot start.
70    pub async fn bind_with_shards(
71        addr: impl tokio::net::ToSocketAddrs,
72        data_dir: &Path,
73        n_shards: usize,
74        workers: usize,
75    ) -> std::io::Result<Self> {
76        Self::bind_full(addr, data_dir, n_shards, workers, false).await
77    }
78
79    /// Full-knob constructor for the read-write path: shard count, worker
80    /// pool, and the opt-in `mmap_tier` flag (see `--tier-mmap` in the
81    /// server CLI). Other entry points delegate here with defaults.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the address cannot be bound or a shard cannot start.
86    pub async fn bind_full(
87        addr: impl tokio::net::ToSocketAddrs,
88        data_dir: &Path,
89        n_shards: usize,
90        workers: usize,
91        mmap_tier: bool,
92    ) -> std::io::Result<Self> {
93        Self::bind_full_mmap(addr, data_dir, n_shards, workers, mmap_tier, false).await
94    }
95
96    /// All-knobs constructor for the read-write path. Adds `mmap_graph`
97    /// to [`bind_full`](Self::bind_full).
98    ///
99    /// # Errors
100    ///
101    /// Returns an error if the address cannot be bound or a shard cannot start.
102    pub async fn bind_full_mmap(
103        addr: impl tokio::net::ToSocketAddrs,
104        data_dir: &Path,
105        n_shards: usize,
106        workers: usize,
107        mmap_tier: bool,
108        mmap_graph: bool,
109    ) -> std::io::Result<Self> {
110        let listener = TcpListener::bind(addr).await?;
111        let shards = ShardSet::open_mode_full_mmap(
112            data_dir,
113            n_shards,
114            false,
115            QuantKind::Int8,
116            workers,
117            mmap_tier,
118            mmap_graph,
119        )?;
120        Ok(Self {
121            listener,
122            shards,
123            tenant_backend: None,
124        })
125    }
126
127    /// Install a multi-tenant backend on a server already built by one
128    /// of the `bind*` constructors. Builder-style for clarity at the
129    /// call site. When set, the RESP3 handler honours `HELLO 3 AUTH`
130    /// and scopes KV / vector ops by tenant id.
131    #[must_use]
132    pub fn with_tenant_backend(mut self, backend: Arc<dyn TenantBackend>) -> Self {
133        self.tenant_backend = Some(backend);
134        self
135    }
136
137    /// Bind the server in serve mode: a single shard over the offline-built
138    /// index at `data_dir`, read-only. Every mutation (KV and vector) is
139    /// rejected; the index is served at its clean resident footprint.
140    ///
141    /// `data_dir` is a directory produced by `skeg-tool build`. `tier` is the
142    /// tier-1 quantisation built for the served index: `QuantKind::Int8` or
143    /// `QuantKind::Pq { m, k }` (smaller footprint). `workers > 0` enables
144    /// the VSEARCH dispatch pool described in [`bind_with_shards`].
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if the address cannot be bound or the shard cannot
149    /// start.
150    pub async fn bind_serve(
151        addr: impl tokio::net::ToSocketAddrs,
152        data_dir: &Path,
153        tier: QuantKind,
154        workers: usize,
155    ) -> std::io::Result<Self> {
156        Self::bind_serve_full(addr, data_dir, tier, workers, false).await
157    }
158
159    /// Full-knob serve mode: like [`bind_serve`] plus the opt-in
160    /// `mmap_tier` flag that swaps the TurboQuant codes for a
161    /// memory-mapped view of `tier.cache.bin` at open.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if the address cannot be bound or the shard cannot start.
166    pub async fn bind_serve_full(
167        addr: impl tokio::net::ToSocketAddrs,
168        data_dir: &Path,
169        tier: QuantKind,
170        workers: usize,
171        mmap_tier: bool,
172    ) -> std::io::Result<Self> {
173        Self::bind_serve_full_mmap(addr, data_dir, tier, workers, mmap_tier, false).await
174    }
175
176    /// All-knobs serve mode. Adds `mmap_graph` to
177    /// [`bind_serve_full`](Self::bind_serve_full).
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if the address cannot be bound or the shard cannot start.
182    pub async fn bind_serve_full_mmap(
183        addr: impl tokio::net::ToSocketAddrs,
184        data_dir: &Path,
185        tier: QuantKind,
186        workers: usize,
187        mmap_tier: bool,
188        mmap_graph: bool,
189    ) -> std::io::Result<Self> {
190        let listener = TcpListener::bind(addr).await?;
191        let shards =
192            ShardSet::open_mode_full_mmap(data_dir, 1, true, tier, workers, mmap_tier, mmap_graph)?;
193        Ok(Self {
194            listener,
195            shards,
196            tenant_backend: None,
197        })
198    }
199
200    /// Return the local address the server is listening on.
201    ///
202    /// # Errors
203    ///
204    /// Returns an error if the OS cannot retrieve the socket address.
205    pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
206        self.listener.local_addr()
207    }
208
209    /// Number of shards backing this server.
210    #[must_use]
211    pub fn n_shards(&self) -> usize {
212        self.shards.n_shards()
213    }
214
215    /// Accept connections and handle them until an I/O error on `accept`.
216    ///
217    /// # Errors
218    ///
219    /// Returns the first error from `TcpListener::accept`.
220    pub async fn run(self) -> std::io::Result<()> {
221        let Self {
222            listener,
223            shards,
224            tenant_backend: _,
225        } = self;
226        info!(addr = ?listener.local_addr()?, n_shards = shards.n_shards(), "server listening (binary protocol)");
227        loop {
228            let (stream, _) = listener.accept().await?;
229            tune_socket(&stream);
230            let shards = shards.clone();
231            tokio::spawn(async move {
232                handle_connection(stream, shards).await;
233            });
234        }
235    }
236
237    /// Like `run`, but speaks RESP3 (Redis wire) on the listener instead of
238    /// the skeg binary protocol. Same shard set, same storage, different
239    /// encoding.
240    ///
241    /// # Errors
242    ///
243    /// Returns the first error from `TcpListener::accept`.
244    pub async fn run_resp3(self) -> std::io::Result<()> {
245        let Self {
246            listener,
247            shards,
248            tenant_backend,
249        } = self;
250        info!(
251            addr = ?listener.local_addr()?,
252            n_shards = shards.n_shards(),
253            tenant = tenant_backend.is_some(),
254            "server listening (RESP3)"
255        );
256        loop {
257            let (stream, _) = listener.accept().await?;
258            tune_socket(&stream);
259            let shards = shards.clone();
260            let backend = tenant_backend.clone();
261            tokio::spawn(async move {
262                handle_connection_resp3(stream, shards, backend).await;
263            });
264        }
265    }
266}
267
268/// Apply per-connection socket tuning: `TCP_NODELAY` for low-latency
269/// request/reply traffic, and `SO_KEEPALIVE` + `TCP_KEEPIDLE` so a
270/// half-open connection (peer dropped without FIN) gets detected
271/// before the OS default of ~2h. Failures here are logged and
272/// swallowed because they don't prevent the connection from working
273/// (they just degrade tail-case behaviour).
274fn tune_socket(stream: &TcpStream) {
275    if let Err(e) = stream.set_nodelay(true) {
276        warn!("set_nodelay failed: {e}");
277    }
278    let sock = socket2::SockRef::from(stream);
279    let ka = socket2::TcpKeepalive::new()
280        .with_time(Duration::from_secs(60))
281        .with_interval(Duration::from_secs(10));
282    if let Err(e) = sock.set_tcp_keepalive(&ka) {
283        warn!("set_tcp_keepalive failed: {e}");
284    }
285}