Skip to main content

rvf_server/
lib.rs

1//! rvf-server -- RuVector Format TCP/HTTP streaming server.
2//!
3//! Provides a network-accessible interface to `rvf-runtime`,
4//! supporting HTTP REST endpoints and a binary TCP streaming protocol
5//! for inter-agent vector exchange.
6
7pub mod error;
8pub mod http;
9pub mod tcp;
10
11use std::sync::Arc;
12
13use tokio::sync::Mutex;
14
15use rvf_runtime::{RvfOptions, RvfStore};
16
17use crate::http::SharedStore;
18
19/// Server configuration.
20#[derive(Clone, Debug)]
21pub struct ServerConfig {
22    /// HTTP listen port.
23    pub http_port: u16,
24    /// TCP streaming listen port.
25    pub tcp_port: u16,
26    /// Path to the RVF data file.
27    pub data_path: std::path::PathBuf,
28    /// Dimension for new stores (only used when creating).
29    pub dimension: u16,
30}
31
32impl Default for ServerConfig {
33    fn default() -> Self {
34        Self {
35            http_port: 8080,
36            tcp_port: 9090,
37            data_path: std::path::PathBuf::from("data.rvf"),
38            dimension: 128,
39        }
40    }
41}
42
43/// Open or create the store at the configured path, returning a shared handle.
44pub fn open_or_create_store(config: &ServerConfig) -> Result<SharedStore, rvf_types::RvfError> {
45    let path = &config.data_path;
46
47    let store = if path.exists() {
48        RvfStore::open(path)?
49    } else {
50        if let Some(parent) = path.parent() {
51            if !parent.exists() {
52                std::fs::create_dir_all(parent)
53                    .map_err(|_| rvf_types::RvfError::Code(rvf_types::ErrorCode::FsyncFailed))?;
54            }
55        }
56        let options = RvfOptions {
57            dimension: config.dimension,
58            ..Default::default()
59        };
60        RvfStore::create(path, options)?
61    };
62
63    Ok(Arc::new(Mutex::new(store)))
64}
65
66/// Start both HTTP and TCP servers. Blocks until shutdown.
67pub async fn run(config: ServerConfig) -> Result<(), Box<dyn std::error::Error>> {
68    let store = open_or_create_store(&config)
69        .map_err(|e| format!("failed to open store: {e}"))?;
70
71    let http_addr = format!("0.0.0.0:{}", config.http_port);
72    let tcp_addr = format!("0.0.0.0:{}", config.tcp_port);
73
74    let app = http::router(store.clone());
75    let listener = tokio::net::TcpListener::bind(&http_addr).await?;
76    eprintln!("rvf-server HTTP listening on {http_addr}");
77    eprintln!("rvf-server TCP  listening on {tcp_addr}");
78
79    tokio::select! {
80        result = axum::serve(listener, app) => {
81            result?;
82        }
83        result = tcp::serve_tcp(&tcp_addr, store) => {
84            result?;
85        }
86    }
87
88    Ok(())
89}