1use std::net::SocketAddr;
8use std::sync::Arc;
9
10use tokio::net::TcpListener;
11use tokio::sync::RwLock;
12use tonic::transport::{Endpoint, Server};
13use tracing_subscriber::EnvFilter;
14
15use crate::config::Config;
16use crate::context;
17use crate::gateway;
18use crate::proto::laurus::v1::{
19 document_service_server::DocumentServiceServer, health_service_server::HealthServiceServer,
20 index_service_server::IndexServiceServer, search_service_server::SearchServiceServer,
21};
22use crate::service::{
23 document::DocumentService, health::HealthService, index::IndexService, search::SearchService,
24};
25
26pub async fn run(config: &Config) -> anyhow::Result<()> {
47 tracing_subscriber::fmt()
50 .with_env_filter(
51 EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
52 )
53 .init();
54
55 tracing::info!("Laurus server starting");
56 tracing::info!("Data directory: {}", config.index.data_dir.display());
57
58 let engine = match context::open_index(&config.index.data_dir).await {
60 Ok(engine) => {
61 tracing::info!("Opened existing index");
62 Some(engine)
63 }
64 Err(_) => {
65 tracing::info!("No existing index found. Use CreateIndex RPC to create one.");
66 None
67 }
68 };
69
70 let engine = Arc::new(RwLock::new(engine));
71 let data_dir = config.index.data_dir.clone();
72
73 let health_service = HealthService;
74 let document_service = DocumentService {
75 engine: engine.clone(),
76 };
77 let index_service = IndexService {
78 engine: engine.clone(),
79 data_dir,
80 };
81 let search_service = SearchService {
82 engine: engine.clone(),
83 };
84
85 let grpc_addr: SocketAddr = format!("{}:{}", config.server.host, config.server.port).parse()?;
86 tracing::info!("gRPC server listening on {grpc_addr}");
87
88 let grpc_server = Server::builder()
89 .add_service(HealthServiceServer::new(health_service))
90 .add_service(DocumentServiceServer::new(document_service))
91 .add_service(IndexServiceServer::new(index_service))
92 .add_service(SearchServiceServer::new(search_service));
93
94 if let Some(http_port) = config.server.http_port {
95 let channel = Endpoint::from_shared(format!("http://127.0.0.1:{}", config.server.port))?
97 .connect_lazy();
98 let gateway_state = gateway::GatewayState::new(channel);
99 let router = gateway::create_router(gateway_state);
100
101 let http_addr: SocketAddr = format!("{}:{}", config.server.host, http_port).parse()?;
102 let listener = TcpListener::bind(http_addr).await?;
103 tracing::info!("HTTP gateway listening on {http_addr}");
104
105 tokio::select! {
107 result = grpc_server.serve(grpc_addr) => {
108 if let Err(e) = result {
109 tracing::error!("gRPC server error: {e}");
110 }
111 }
112 result = axum::serve(listener, router) => {
113 if let Err(e) = result {
114 tracing::error!("HTTP gateway error: {e}");
115 }
116 }
117 _ = shutdown_signal(engine.clone()) => {}
118 }
119 } else {
120 grpc_server
122 .serve_with_shutdown(grpc_addr, shutdown_signal(engine.clone()))
123 .await?;
124 }
125
126 Ok(())
127}
128
129async fn shutdown_signal(engine: Arc<RwLock<Option<laurus::Engine>>>) {
131 tokio::signal::ctrl_c()
132 .await
133 .expect("failed to install Ctrl+C handler");
134
135 tracing::info!("Shutdown signal received, committing pending changes...");
136
137 let guard = engine.read().await;
138 if let Some(engine) = guard.as_ref() {
139 if let Err(e) = engine.commit().await {
140 tracing::error!("Failed to commit on shutdown: {e}");
141 } else {
142 tracing::info!("Committed successfully");
143 }
144 }
145
146 tracing::info!("Server shutting down");
147}