fraiseql_server/server/
lifecycle.rs1use tokio::net::TcpListener;
4use tracing::{error, info, warn};
5
6use super::{DatabaseAdapter, Result, Server, ServerError, TlsSetup};
7
8impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
9 pub async fn serve(self) -> Result<()> {
18 self.serve_with_shutdown(Self::shutdown_signal()).await
19 }
20
21 #[allow(clippy::cognitive_complexity)] pub async fn serve_with_shutdown<F>(self, shutdown: F) -> Result<()>
31 where
32 F: std::future::Future<Output = ()> + Send + 'static,
33 {
34 #[cfg(feature = "observers")]
37 if let Some(ref db_pool) = self.db_pool {
38 if self.config.admin_token.is_some() {
39 let rbac_backend =
40 crate::api::rbac_management::db_backend::RbacDbBackend::new(db_pool.clone());
41 rbac_backend.ensure_schema().await.map_err(|e| {
42 ServerError::ConfigError(format!("Failed to initialize RBAC schema: {e}"))
43 })?;
44 }
45 }
46
47 let (app, app_state) = self.build_router();
48
49 #[cfg(unix)]
53 if let Some(ref schema_path) = app_state.schema_path {
54 let reload_state = app_state.clone();
55 let reload_path = schema_path.clone();
56 tokio::spawn(async move {
57 let mut sigusr1 = match tokio::signal::unix::signal(
58 tokio::signal::unix::SignalKind::user_defined1(),
59 ) {
60 Ok(s) => s,
61 Err(e) => {
62 warn!(error = %e, "Failed to install SIGUSR1 handler — schema hot-reload disabled");
63 return;
64 },
65 };
66 loop {
67 sigusr1.recv().await;
68 info!(
69 path = %reload_path.display(),
70 "Received SIGUSR1 — reloading schema"
71 );
72 match reload_state.reload_schema(&reload_path).await {
73 Ok(()) => {
74 let hash = reload_state.executor().schema().content_hash();
75 reload_state
76 .metrics
77 .schema_reloads_total
78 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
79 info!(schema_hash = %hash, "Schema reloaded successfully via SIGUSR1");
80 },
81 Err(e) => {
82 reload_state
83 .metrics
84 .schema_reload_errors_total
85 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
86 error!(
87 error = %e,
88 path = %reload_path.display(),
89 "Schema reload failed via SIGUSR1 — keeping previous schema"
90 );
91 },
92 }
93 }
94 });
95 info!(
96 path = %schema_path.display(),
97 "SIGUSR1 schema reload handler installed"
98 );
99 }
100
101 let tls_setup = TlsSetup::new(self.config.tls.clone(), self.config.database_tls.clone())?;
103
104 info!(
105 bind_addr = %self.config.bind_addr,
106 graphql_path = %self.config.graphql_path,
107 tls_enabled = tls_setup.is_tls_enabled(),
108 "Starting FraiseQL server"
109 );
110
111 #[cfg(feature = "observers")]
113 if let Some(ref runtime) = self.observer_runtime {
114 info!("Starting observer runtime...");
115 let mut guard = runtime.write().await;
116
117 match guard.start().await {
118 Ok(()) => info!("Observer runtime started"),
119 Err(e) => {
120 error!("Failed to start observer runtime: {}", e);
121 warn!("Server will continue without observers");
122 },
123 }
124 drop(guard);
125 }
126
127 let listener = TcpListener::bind(self.config.bind_addr)
128 .await
129 .map_err(|e| ServerError::BindError(e.to_string()))?;
130
131 #[cfg(target_os = "linux")]
134 {
135 if let Ok(limits) = std::fs::read_to_string("/proc/self/limits") {
136 for line in limits.lines() {
137 if line.starts_with("Max open files") {
138 let parts: Vec<&str> = line.split_whitespace().collect();
139 if let Some(soft) = parts.get(3) {
140 if let Ok(n) = soft.parse::<u64>() {
141 if n < 65_536 {
142 warn!(
143 current_fd_limit = n,
144 recommended = 65_536,
145 "File descriptor limit is low; consider raising ulimit -n"
146 );
147 }
148 }
149 }
150 break;
151 }
152 }
153 }
154 }
155
156 if tls_setup.is_tls_enabled() {
158 let _ = tls_setup.create_rustls_config()?;
160 info!(
161 cert_path = ?tls_setup.cert_path(),
162 key_path = ?tls_setup.key_path(),
163 mtls_required = tls_setup.is_mtls_required(),
164 "Server TLS configuration loaded (note: use reverse proxy for server-side TLS termination)"
165 );
166 }
167
168 info!(
170 postgres_ssl_mode = tls_setup.postgres_ssl_mode(),
171 redis_ssl = tls_setup.redis_ssl_enabled(),
172 clickhouse_https = tls_setup.clickhouse_https_enabled(),
173 elasticsearch_https = tls_setup.elasticsearch_https_enabled(),
174 "Database connection TLS configuration applied"
175 );
176
177 info!("Server listening on http://{}", self.config.bind_addr);
178
179 #[cfg(feature = "arrow")]
181 if let Some(flight_service) = self.flight_service {
182 let flight_addr = self.config.flight_bind_addr;
183 info!("Arrow Flight server listening on grpc://{}", flight_addr);
184
185 let flight_server = tokio::spawn(async move {
187 tonic::transport::Server::builder()
188 .add_service(flight_service.into_server())
189 .serve(flight_addr)
190 .await
191 });
192
193 #[cfg(feature = "observers")]
195 let observer_runtime = self.observer_runtime.clone();
196
197 let shutdown_with_cleanup = async move {
198 shutdown.await;
199 #[cfg(feature = "observers")]
200 if let Some(ref runtime) = observer_runtime {
201 info!("Shutting down observer runtime");
202 let mut guard = runtime.write().await;
203 if let Err(e) = guard.stop().await {
204 #[cfg(feature = "observers")]
205 error!("Error stopping runtime: {}", e);
206 } else {
207 info!("Runtime stopped cleanly");
208 }
209 }
210 };
211
212 axum::serve(listener, app)
214 .with_graceful_shutdown(shutdown_with_cleanup)
215 .await
216 .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
217
218 flight_server.abort();
220 }
221
222 #[cfg(not(feature = "arrow"))]
224 {
225 axum::serve(listener, app)
226 .with_graceful_shutdown(shutdown)
227 .await
228 .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
229
230 let shutdown_timeout =
231 std::time::Duration::from_secs(self.config.shutdown_timeout_secs);
232 info!(
233 timeout_secs = self.config.shutdown_timeout_secs,
234 "HTTP server stopped, draining remaining work"
235 );
236
237 let drain = tokio::time::timeout(shutdown_timeout, async {
238 #[cfg(feature = "observers")]
239 if let Some(ref runtime) = self.observer_runtime {
240 let mut guard = runtime.write().await;
241 match guard.stop().await {
242 Ok(()) => info!("Observer runtime stopped cleanly"),
243 Err(e) => warn!("Observer runtime shutdown error: {e}"),
244 }
245 }
246 })
247 .await;
248
249 if drain.is_err() {
250 warn!(
251 timeout_secs = self.config.shutdown_timeout_secs,
252 "Shutdown drain timed out; forcing exit"
253 );
254 } else {
255 info!("Graceful shutdown complete");
256 }
257 }
258
259 Ok(())
260 }
261
262 pub async fn serve_on_listener<F>(self, listener: TcpListener, shutdown: F) -> Result<()>
271 where
272 F: std::future::Future<Output = ()> + Send + 'static,
273 {
274 let (app, _app_state) = self.build_router();
275 axum::serve(listener, app)
276 .with_graceful_shutdown(shutdown)
277 .await
278 .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
279 Ok(())
280 }
281
282 pub async fn shutdown_signal() {
284 use tokio::signal;
285
286 let ctrl_c = async {
287 match signal::ctrl_c().await {
288 Ok(()) => {},
289 Err(e) => {
290 warn!(error = %e, "Failed to install Ctrl+C handler");
291 std::future::pending::<()>().await;
292 },
293 }
294 };
295
296 #[cfg(unix)]
297 let terminate = async {
298 match signal::unix::signal(signal::unix::SignalKind::terminate()) {
299 Ok(mut s) => {
300 s.recv().await;
301 },
302 Err(e) => {
303 warn!(error = %e, "Failed to install SIGTERM handler");
304 std::future::pending::<()>().await;
305 },
306 }
307 };
308
309 #[cfg(not(unix))]
310 let terminate = std::future::pending::<()>();
311
312 tokio::select! {
313 () = ctrl_c => info!("Received Ctrl+C"),
314 () = terminate => info!("Received SIGTERM"),
315 }
316 }
317}