#![deny(unsafe_code)]
#![allow(clippy::too_many_arguments)]
pub mod handler;
pub mod quota;
pub mod resp3_handler;
pub mod shard;
pub mod tenant;
#[cfg(feature = "tracing-otlp")]
pub mod tracing_otlp;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tracing::{info, warn};
use handler::handle_connection;
pub use quota::{TenantLimits, TenantVectorQuota};
use resp3_handler::handle_connection_resp3;
use shard::ShardSet;
use skeg_vector::QuantKind;
pub use tenant::{AnonymousPolicy, QuotaAdminError, TenantBackend, TenantId};
pub struct Server {
listener: TcpListener,
shards: ShardSet,
tenant_backend: Option<Arc<dyn TenantBackend>>,
}
impl Server {
pub async fn bind(
addr: impl tokio::net::ToSocketAddrs,
data_dir: &Path,
) -> std::io::Result<Self> {
let n_shards = skeg_platform::num_performance_cores();
Self::bind_full(addr, data_dir, n_shards, 0, false).await
}
pub async fn bind_with_shards(
addr: impl tokio::net::ToSocketAddrs,
data_dir: &Path,
n_shards: usize,
workers: usize,
) -> std::io::Result<Self> {
Self::bind_full(addr, data_dir, n_shards, workers, false).await
}
pub async fn bind_full(
addr: impl tokio::net::ToSocketAddrs,
data_dir: &Path,
n_shards: usize,
workers: usize,
mmap_tier: bool,
) -> std::io::Result<Self> {
Self::bind_full_mmap(addr, data_dir, n_shards, workers, mmap_tier, false).await
}
pub async fn bind_full_mmap(
addr: impl tokio::net::ToSocketAddrs,
data_dir: &Path,
n_shards: usize,
workers: usize,
mmap_tier: bool,
mmap_graph: bool,
) -> std::io::Result<Self> {
let listener = TcpListener::bind(addr).await?;
let shards = ShardSet::open_mode_full_mmap(
data_dir,
n_shards,
false,
QuantKind::Int8,
workers,
mmap_tier,
mmap_graph,
)?;
Ok(Self {
listener,
shards,
tenant_backend: None,
})
}
#[must_use]
pub fn with_tenant_backend(mut self, backend: Arc<dyn TenantBackend>) -> Self {
self.tenant_backend = Some(backend);
self
}
pub async fn bind_serve(
addr: impl tokio::net::ToSocketAddrs,
data_dir: &Path,
tier: QuantKind,
workers: usize,
) -> std::io::Result<Self> {
Self::bind_serve_full(addr, data_dir, tier, workers, false).await
}
pub async fn bind_serve_full(
addr: impl tokio::net::ToSocketAddrs,
data_dir: &Path,
tier: QuantKind,
workers: usize,
mmap_tier: bool,
) -> std::io::Result<Self> {
Self::bind_serve_full_mmap(addr, data_dir, tier, workers, mmap_tier, false).await
}
pub async fn bind_serve_full_mmap(
addr: impl tokio::net::ToSocketAddrs,
data_dir: &Path,
tier: QuantKind,
workers: usize,
mmap_tier: bool,
mmap_graph: bool,
) -> std::io::Result<Self> {
let listener = TcpListener::bind(addr).await?;
let shards =
ShardSet::open_mode_full_mmap(data_dir, 1, true, tier, workers, mmap_tier, mmap_graph)?;
Ok(Self {
listener,
shards,
tenant_backend: None,
})
}
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
self.listener.local_addr()
}
#[must_use]
pub fn n_shards(&self) -> usize {
self.shards.n_shards()
}
pub async fn run(self) -> std::io::Result<()> {
let Self {
listener,
shards,
tenant_backend: _,
} = self;
info!(addr = ?listener.local_addr()?, n_shards = shards.n_shards(), "server listening (binary protocol)");
loop {
let (stream, _) = listener.accept().await?;
tune_socket(&stream);
let shards = shards.clone();
tokio::spawn(async move {
handle_connection(stream, shards).await;
});
}
}
pub async fn run_resp3(self) -> std::io::Result<()> {
let Self {
listener,
shards,
tenant_backend,
} = self;
info!(
addr = ?listener.local_addr()?,
n_shards = shards.n_shards(),
tenant = tenant_backend.is_some(),
"server listening (RESP3)"
);
loop {
let (stream, _) = listener.accept().await?;
tune_socket(&stream);
let shards = shards.clone();
let backend = tenant_backend.clone();
tokio::spawn(async move {
handle_connection_resp3(stream, shards, backend).await;
});
}
}
}
fn tune_socket(stream: &TcpStream) {
if let Err(e) = stream.set_nodelay(true) {
warn!("set_nodelay failed: {e}");
}
let sock = socket2::SockRef::from(stream);
let ka = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(60))
.with_interval(Duration::from_secs(10));
if let Err(e) = sock.set_tcp_keepalive(&ka) {
warn!("set_tcp_keepalive failed: {e}");
}
}