fraiseql_server/server/
lifecycle.rs1use axum::serve::ListenerExt;
4use tokio::net::TcpListener;
5use tracing::{error, info, warn};
6
7use super::{DatabaseAdapter, Result, Server, ServerError, TlsSetup};
8
9impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
10 pub async fn serve(self) -> Result<()> {
19 self.serve_with_shutdown(Self::shutdown_signal()).await
20 }
21
22 #[allow(clippy::cognitive_complexity)] pub async fn serve_with_shutdown<F>(self, shutdown: F) -> Result<()>
32 where
33 F: std::future::Future<Output = ()> + Send + 'static,
34 {
35 #[cfg(feature = "observers")]
38 if let Some(ref db_pool) = self.db_pool {
39 if self.config.admin_token.is_some() {
40 let rbac_backend =
41 crate::api::rbac_management::db_backend::RbacDbBackend::new(db_pool.clone());
42 rbac_backend.ensure_schema().await.map_err(|e| {
43 ServerError::ConfigError(format!("Failed to initialize RBAC schema: {e}"))
44 })?;
45 }
46 }
47
48 let (app, app_state) = self.build_router();
49
50 #[cfg(unix)]
54 if let Some(ref schema_path) = app_state.schema_path {
55 let reload_state = app_state.clone();
56 let reload_path = schema_path.clone();
57 tokio::spawn(async move {
58 let mut sigusr1 = match tokio::signal::unix::signal(
59 tokio::signal::unix::SignalKind::user_defined1(),
60 ) {
61 Ok(s) => s,
62 Err(e) => {
63 warn!(error = %e, "Failed to install SIGUSR1 handler — schema hot-reload disabled");
64 return;
65 },
66 };
67 loop {
68 sigusr1.recv().await;
69 info!(
70 path = %reload_path.display(),
71 "Received SIGUSR1 — reloading schema"
72 );
73 match reload_state.reload_schema(&reload_path).await {
74 Ok(()) => {
75 let hash = reload_state.executor().schema().content_hash();
76 reload_state
77 .metrics
78 .schema_reloads_total
79 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
80 info!(schema_hash = %hash, "Schema reloaded successfully via SIGUSR1");
81 },
82 Err(e) => {
83 reload_state
84 .metrics
85 .schema_reload_errors_total
86 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
87 error!(
88 error = %e,
89 path = %reload_path.display(),
90 "Schema reload failed via SIGUSR1 — keeping previous schema"
91 );
92 },
93 }
94 }
95 });
96 info!(
97 path = %schema_path.display(),
98 "SIGUSR1 schema reload handler installed"
99 );
100 }
101
102 let tls_setup = TlsSetup::new(self.config.tls.clone(), self.config.database_tls.clone())?;
104
105 info!(
106 bind_addr = %self.config.bind_addr,
107 graphql_path = %self.config.graphql_path,
108 tls_enabled = tls_setup.is_tls_enabled(),
109 "Starting FraiseQL server"
110 );
111
112 #[cfg(feature = "observers")]
114 if let Some(ref runtime) = self.observer_runtime {
115 info!("Starting observer runtime...");
116 let mut guard = runtime.write().await;
117
118 match guard.start().await {
119 Ok(()) => info!("Observer runtime started"),
120 Err(e) => {
121 error!("Failed to start observer runtime: {}", e);
122 warn!("Server will continue without observers");
123 },
124 }
125 drop(guard);
126 }
127
128 let listener = TcpListener::bind(self.config.bind_addr)
131 .await
132 .map_err(|e| ServerError::BindError(e.to_string()))?
133 .tap_io(|tcp_stream| {
134 if let Err(err) = tcp_stream.set_nodelay(true) {
135 warn!("failed to set TCP_NODELAY: {err:#}");
136 }
137 });
138
139 #[cfg(target_os = "linux")]
142 {
143 if let Ok(limits) = std::fs::read_to_string("/proc/self/limits") {
144 for line in limits.lines() {
145 if line.starts_with("Max open files") {
146 let parts: Vec<&str> = line.split_whitespace().collect();
147 if let Some(soft) = parts.get(3) {
148 if let Ok(n) = soft.parse::<u64>() {
149 if n < 65_536 {
150 warn!(
151 current_fd_limit = n,
152 recommended = 65_536,
153 "File descriptor limit is low; consider raising ulimit -n"
154 );
155 }
156 }
157 }
158 break;
159 }
160 }
161 }
162 }
163
164 if tls_setup.is_tls_enabled() {
166 let _ = tls_setup.create_rustls_config()?;
168 info!(
169 cert_path = ?tls_setup.cert_path(),
170 key_path = ?tls_setup.key_path(),
171 mtls_required = tls_setup.is_mtls_required(),
172 "Server TLS configuration loaded (note: use reverse proxy for server-side TLS termination)"
173 );
174 }
175
176 info!(
178 postgres_ssl_mode = tls_setup.postgres_ssl_mode(),
179 redis_ssl = tls_setup.redis_ssl_enabled(),
180 clickhouse_https = tls_setup.clickhouse_https_enabled(),
181 elasticsearch_https = tls_setup.elasticsearch_https_enabled(),
182 "Database connection TLS configuration applied"
183 );
184
185 info!("Server listening on http://{}", self.config.bind_addr);
186
187 #[cfg(feature = "arrow")]
189 if let Some(flight_service) = self.flight_service {
190 let flight_addr = self.config.flight_bind_addr;
191 info!("Arrow Flight server listening on grpc://{}", flight_addr);
192
193 let flight_server = tokio::spawn(async move {
195 tonic::transport::Server::builder()
196 .add_service(flight_service.into_server())
197 .serve(flight_addr)
198 .await
199 });
200
201 #[cfg(feature = "observers")]
203 let observer_runtime = self.observer_runtime.clone();
204
205 let shutdown_with_cleanup = async move {
206 shutdown.await;
207 #[cfg(feature = "observers")]
208 if let Some(ref runtime) = observer_runtime {
209 info!("Shutting down observer runtime");
210 let mut guard = runtime.write().await;
211 if let Err(e) = guard.stop().await {
212 #[cfg(feature = "observers")]
213 error!("Error stopping runtime: {}", e);
214 } else {
215 info!("Runtime stopped cleanly");
216 }
217 }
218 };
219
220 axum::serve(listener, app)
222 .with_graceful_shutdown(shutdown_with_cleanup)
223 .await
224 .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
225
226 flight_server.abort();
228 }
229
230 #[cfg(not(feature = "arrow"))]
232 {
233 axum::serve(listener, app)
234 .with_graceful_shutdown(shutdown)
235 .await
236 .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
237
238 let shutdown_timeout =
239 std::time::Duration::from_secs(self.config.shutdown_timeout_secs);
240 info!(
241 timeout_secs = self.config.shutdown_timeout_secs,
242 "HTTP server stopped, draining remaining work"
243 );
244
245 let drain = tokio::time::timeout(shutdown_timeout, async {
246 #[cfg(feature = "observers")]
247 if let Some(ref runtime) = self.observer_runtime {
248 let mut guard = runtime.write().await;
249 match guard.stop().await {
250 Ok(()) => info!("Observer runtime stopped cleanly"),
251 Err(e) => warn!("Observer runtime shutdown error: {e}"),
252 }
253 }
254 })
255 .await;
256
257 if drain.is_err() {
258 warn!(
259 timeout_secs = self.config.shutdown_timeout_secs,
260 "Shutdown drain timed out; forcing exit"
261 );
262 } else {
263 info!("Graceful shutdown complete");
264 }
265 }
266
267 Ok(())
268 }
269
270 pub async fn serve_on_listener<F>(self, listener: TcpListener, shutdown: F) -> Result<()>
279 where
280 F: std::future::Future<Output = ()> + Send + 'static,
281 {
282 let (app, _app_state) = self.build_router();
283 axum::serve(listener, app)
284 .with_graceful_shutdown(shutdown)
285 .await
286 .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
287 Ok(())
288 }
289
290 pub async fn shutdown_signal() {
292 use tokio::signal;
293
294 let ctrl_c = async {
295 match signal::ctrl_c().await {
296 Ok(()) => {},
297 Err(e) => {
298 warn!(error = %e, "Failed to install Ctrl+C handler");
299 std::future::pending::<()>().await;
300 },
301 }
302 };
303
304 #[cfg(unix)]
305 let terminate = async {
306 match signal::unix::signal(signal::unix::SignalKind::terminate()) {
307 Ok(mut s) => {
308 s.recv().await;
309 },
310 Err(e) => {
311 warn!(error = %e, "Failed to install SIGTERM handler");
312 std::future::pending::<()>().await;
313 },
314 }
315 };
316
317 #[cfg(not(unix))]
318 let terminate = std::future::pending::<()>();
319
320 tokio::select! {
321 () = ctrl_c => info!("Received Ctrl+C"),
322 () = terminate => info!("Received SIGTERM"),
323 }
324 }
325}