Skip to main content

laurus_server/
server.rs

1//! Server bootstrap logic.
2//!
3//! The [`run`] function initialises logging, opens or creates the search index,
4//! starts the gRPC server (and optionally the HTTP gateway), and waits for a
5//! shutdown signal (`Ctrl+C`).
6
7use std::net::SocketAddr;
8use std::sync::Arc;
9
10use tokio::net::TcpListener;
11use tokio::sync::RwLock;
12use tonic::transport::{Endpoint, Server};
13use tracing_subscriber::EnvFilter;
14
15use crate::config::Config;
16use crate::context;
17use crate::gateway;
18use crate::proto::laurus::v1::{
19    document_service_server::DocumentServiceServer, health_service_server::HealthServiceServer,
20    index_service_server::IndexServiceServer, search_service_server::SearchServiceServer,
21};
22use crate::service::{
23    document::DocumentService, health::HealthService, index::IndexService, search::SearchService,
24};
25
26/// Starts the server based on the given configuration.
27///
28/// If `http_port` is configured, both the gRPC server and HTTP Gateway are
29/// started concurrently. Otherwise, only the gRPC server is started.
30///
31/// The function blocks until a shutdown signal (`Ctrl+C`) is received.
32/// On shutdown it commits any pending index changes before exiting.
33///
34/// # Arguments
35///
36/// * `config` - Server, index, and logging configuration.
37///
38/// # Returns
39///
40/// `Ok(())` when the server shuts down cleanly.
41///
42/// # Errors
43///
44/// Returns an error if the address cannot be parsed, the gRPC server fails to
45/// start, or the HTTP gateway listener cannot bind.
46pub async fn run(config: &Config) -> anyhow::Result<()> {
47    // Initialize tracing using the RUST_LOG environment variable.
48    // Falls back to "info" when RUST_LOG is not set.
49    tracing_subscriber::fmt()
50        .with_env_filter(
51            EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
52        )
53        .init();
54
55    tracing::info!("Laurus server starting");
56    tracing::info!("Data directory: {}", config.index.data_dir.display());
57
58    // Open an existing index. If none exists, start without an index.
59    let engine = match context::open_index(&config.index.data_dir).await {
60        Ok(engine) => {
61            tracing::info!("Opened existing index");
62            Some(engine)
63        }
64        Err(_) => {
65            tracing::info!("No existing index found. Use CreateIndex RPC to create one.");
66            None
67        }
68    };
69
70    let engine = Arc::new(RwLock::new(engine));
71    let data_dir = config.index.data_dir.clone();
72
73    let health_service = HealthService;
74    let document_service = DocumentService {
75        engine: engine.clone(),
76    };
77    let index_service = IndexService {
78        engine: engine.clone(),
79        data_dir,
80    };
81    let search_service = SearchService {
82        engine: engine.clone(),
83    };
84
85    let grpc_addr: SocketAddr = format!("{}:{}", config.server.host, config.server.port).parse()?;
86    tracing::info!("gRPC server listening on {grpc_addr}");
87
88    let grpc_server = Server::builder()
89        .add_service(HealthServiceServer::new(health_service))
90        .add_service(DocumentServiceServer::new(document_service))
91        .add_service(IndexServiceServer::new(index_service))
92        .add_service(SearchServiceServer::new(search_service));
93
94    if let Some(http_port) = config.server.http_port {
95        // Also start the gRPC Gateway (HTTP server) concurrently.
96        let channel = Endpoint::from_shared(format!("http://127.0.0.1:{}", config.server.port))?
97            .connect_lazy();
98        let gateway_state = gateway::GatewayState::new(channel);
99        let router = gateway::create_router(gateway_state);
100
101        let http_addr: SocketAddr = format!("{}:{}", config.server.host, http_port).parse()?;
102        let listener = TcpListener::bind(http_addr).await?;
103        tracing::info!("HTTP gateway listening on {http_addr}");
104
105        // Run the gRPC server and HTTP Gateway concurrently, shutting both down on Ctrl+C.
106        tokio::select! {
107            result = grpc_server.serve(grpc_addr) => {
108                if let Err(e) = result {
109                    tracing::error!("gRPC server error: {e}");
110                }
111            }
112            result = axum::serve(listener, router) => {
113                if let Err(e) = result {
114                    tracing::error!("HTTP gateway error: {e}");
115                }
116            }
117            _ = shutdown_signal(engine.clone()) => {}
118        }
119    } else {
120        // Start only the gRPC server (legacy behavior).
121        grpc_server
122            .serve_with_shutdown(grpc_addr, shutdown_signal(engine.clone()))
123            .await?;
124    }
125
126    Ok(())
127}
128
129/// Waits for a shutdown signal (Ctrl+C) and commits pending changes before exiting.
130async fn shutdown_signal(engine: Arc<RwLock<Option<laurus::Engine>>>) {
131    tokio::signal::ctrl_c()
132        .await
133        .expect("failed to install Ctrl+C handler");
134
135    tracing::info!("Shutdown signal received, committing pending changes...");
136
137    let guard = engine.read().await;
138    if let Some(engine) = guard.as_ref() {
139        if let Err(e) = engine.commit().await {
140            tracing::error!("Failed to commit on shutdown: {e}");
141        } else {
142            tracing::info!("Committed successfully");
143        }
144    }
145
146    tracing::info!("Server shutting down");
147}