Skip to main content

fraiseql_server/server/
lifecycle.rs

1//! Server lifecycle: serve, `serve_with_shutdown`, and `shutdown_signal`.
2
3use axum::serve::ListenerExt;
4use tokio::net::TcpListener;
5use tracing::{error, info, warn};
6
7use super::{DatabaseAdapter, Result, Server, ServerError, TlsSetup};
8
9impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
10    /// Start server and listen for requests.
11    ///
12    /// Uses SIGUSR1-aware shutdown signal when a schema path is configured,
13    /// enabling zero-downtime schema reloads via `kill -USR1 <pid>`.
14    ///
15    /// # Errors
16    ///
17    /// Returns error if server fails to bind or encounters runtime errors.
18    pub async fn serve(self) -> Result<()> {
19        self.serve_with_shutdown(Self::shutdown_signal()).await
20    }
21
22    /// Start server with a custom shutdown future.
23    ///
24    /// Enables programmatic shutdown (e.g., for `--watch` hot-reload) by accepting any
25    /// future that resolves when the server should stop.
26    ///
27    /// # Errors
28    ///
29    /// Returns error if server fails to bind or encounters runtime errors.
30    #[allow(clippy::cognitive_complexity)] // Reason: server lifecycle with TLS/non-TLS binding, signal handling, and graceful shutdown
31    pub async fn serve_with_shutdown<F>(self, shutdown: F) -> Result<()>
32    where
33        F: std::future::Future<Output = ()> + Send + 'static,
34    {
35        // Ensure RBAC schema exists before the router mounts RBAC endpoints.
36        // Must run here (async context) rather than inside build_router() (sync).
37        #[cfg(feature = "observers")]
38        if let Some(ref db_pool) = self.db_pool {
39            if self.config.admin_token.is_some() {
40                let rbac_backend =
41                    crate::api::rbac_management::db_backend::RbacDbBackend::new(db_pool.clone());
42                rbac_backend.ensure_schema().await.map_err(|e| {
43                    ServerError::ConfigError(format!("Failed to initialize RBAC schema: {e}"))
44                })?;
45            }
46        }
47
48        let (app, app_state) = self.build_router();
49
50        // Spawn SIGUSR1 schema reload handler when running on Unix.
51        // The handler loops forever, reloading on each signal, until the
52        // server process exits.
53        #[cfg(unix)]
54        if let Some(ref schema_path) = app_state.schema_path {
55            let reload_state = app_state.clone();
56            let reload_path = schema_path.clone();
57            tokio::spawn(async move {
58                let mut sigusr1 = match tokio::signal::unix::signal(
59                    tokio::signal::unix::SignalKind::user_defined1(),
60                ) {
61                    Ok(s) => s,
62                    Err(e) => {
63                        warn!(error = %e, "Failed to install SIGUSR1 handler — schema hot-reload disabled");
64                        return;
65                    },
66                };
67                loop {
68                    sigusr1.recv().await;
69                    info!(
70                        path = %reload_path.display(),
71                        "Received SIGUSR1 — reloading schema"
72                    );
73                    match reload_state.reload_schema(&reload_path).await {
74                        Ok(()) => {
75                            let hash = reload_state.executor().schema().content_hash();
76                            reload_state
77                                .metrics
78                                .schema_reloads_total
79                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
80                            info!(schema_hash = %hash, "Schema reloaded successfully via SIGUSR1");
81                        },
82                        Err(e) => {
83                            reload_state
84                                .metrics
85                                .schema_reload_errors_total
86                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
87                            error!(
88                                error = %e,
89                                path = %reload_path.display(),
90                                "Schema reload failed via SIGUSR1 — keeping previous schema"
91                            );
92                        },
93                    }
94                }
95            });
96            info!(
97                path = %schema_path.display(),
98                "SIGUSR1 schema reload handler installed"
99            );
100        }
101
102        // Initialize TLS setup
103        let tls_setup = TlsSetup::new(self.config.tls.clone(), self.config.database_tls.clone())?;
104
105        info!(
106            bind_addr = %self.config.bind_addr,
107            graphql_path = %self.config.graphql_path,
108            tls_enabled = tls_setup.is_tls_enabled(),
109            "Starting FraiseQL server"
110        );
111
112        // Start observer runtime if configured
113        #[cfg(feature = "observers")]
114        if let Some(ref runtime) = self.observer_runtime {
115            info!("Starting observer runtime...");
116            let mut guard = runtime.write().await;
117
118            match guard.start().await {
119                Ok(()) => info!("Observer runtime started"),
120                Err(e) => {
121                    error!("Failed to start observer runtime: {}", e);
122                    warn!("Server will continue without observers");
123                },
124            }
125            drop(guard);
126        }
127
128        // Explicitly enable TCP_NODELAY (disable Nagle's algorithm) on every
129        // accepted connection to minimise latency for small GraphQL responses.
130        let listener = TcpListener::bind(self.config.bind_addr)
131            .await
132            .map_err(|e| ServerError::BindError(e.to_string()))?
133            .tap_io(|tcp_stream| {
134                if let Err(err) = tcp_stream.set_nodelay(true) {
135                    warn!("failed to set TCP_NODELAY: {err:#}");
136                }
137            });
138
139        // Warn if the process file descriptor limit is below the recommended minimum.
140        // A low limit causes "too many open files" errors under load.
141        #[cfg(target_os = "linux")]
142        {
143            if let Ok(limits) = std::fs::read_to_string("/proc/self/limits") {
144                for line in limits.lines() {
145                    if line.starts_with("Max open files") {
146                        let parts: Vec<&str> = line.split_whitespace().collect();
147                        if let Some(soft) = parts.get(3) {
148                            if let Ok(n) = soft.parse::<u64>() {
149                                if n < 65_536 {
150                                    warn!(
151                                        current_fd_limit = n,
152                                        recommended = 65_536,
153                                        "File descriptor limit is low; consider raising ulimit -n"
154                                    );
155                                }
156                            }
157                        }
158                        break;
159                    }
160                }
161            }
162        }
163
164        // Log TLS configuration
165        if tls_setup.is_tls_enabled() {
166            // Verify TLS setup is valid (will error if certificates are missing/invalid)
167            let _ = tls_setup.create_rustls_config()?;
168            info!(
169                cert_path = ?tls_setup.cert_path(),
170                key_path = ?tls_setup.key_path(),
171                mtls_required = tls_setup.is_mtls_required(),
172                "Server TLS configuration loaded (note: use reverse proxy for server-side TLS termination)"
173            );
174        }
175
176        // Log database TLS configuration
177        info!(
178            postgres_ssl_mode = tls_setup.postgres_ssl_mode(),
179            redis_ssl = tls_setup.redis_ssl_enabled(),
180            clickhouse_https = tls_setup.clickhouse_https_enabled(),
181            elasticsearch_https = tls_setup.elasticsearch_https_enabled(),
182            "Database connection TLS configuration applied"
183        );
184
185        info!("Server listening on http://{}", self.config.bind_addr);
186
187        // Start both HTTP and gRPC servers concurrently if Arrow Flight is enabled
188        #[cfg(feature = "arrow")]
189        if let Some(flight_service) = self.flight_service {
190            let flight_addr = self.config.flight_bind_addr;
191            info!("Arrow Flight server listening on grpc://{}", flight_addr);
192
193            // Spawn Flight server in background
194            let flight_server = tokio::spawn(async move {
195                tonic::transport::Server::builder()
196                    .add_service(flight_service.into_server())
197                    .serve(flight_addr)
198                    .await
199            });
200
201            // Wrap the user-supplied shutdown future so we can also stop observer runtime
202            #[cfg(feature = "observers")]
203            let observer_runtime = self.observer_runtime.clone();
204
205            let shutdown_with_cleanup = async move {
206                shutdown.await;
207                #[cfg(feature = "observers")]
208                if let Some(ref runtime) = observer_runtime {
209                    info!("Shutting down observer runtime");
210                    let mut guard = runtime.write().await;
211                    if let Err(e) = guard.stop().await {
212                        #[cfg(feature = "observers")]
213                        error!("Error stopping runtime: {}", e);
214                    } else {
215                        info!("Runtime stopped cleanly");
216                    }
217                }
218            };
219
220            // Run HTTP server with graceful shutdown
221            axum::serve(listener, app)
222                .with_graceful_shutdown(shutdown_with_cleanup)
223                .await
224                .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
225
226            // Abort Flight server after HTTP server exits
227            flight_server.abort();
228        }
229
230        // HTTP-only server (when arrow feature not enabled)
231        #[cfg(not(feature = "arrow"))]
232        {
233            axum::serve(listener, app)
234                .with_graceful_shutdown(shutdown)
235                .await
236                .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
237
238            let shutdown_timeout =
239                std::time::Duration::from_secs(self.config.shutdown_timeout_secs);
240            info!(
241                timeout_secs = self.config.shutdown_timeout_secs,
242                "HTTP server stopped, draining remaining work"
243            );
244
245            let drain = tokio::time::timeout(shutdown_timeout, async {
246                #[cfg(feature = "observers")]
247                if let Some(ref runtime) = self.observer_runtime {
248                    let mut guard = runtime.write().await;
249                    match guard.stop().await {
250                        Ok(()) => info!("Observer runtime stopped cleanly"),
251                        Err(e) => warn!("Observer runtime shutdown error: {e}"),
252                    }
253                }
254            })
255            .await;
256
257            if drain.is_err() {
258                warn!(
259                    timeout_secs = self.config.shutdown_timeout_secs,
260                    "Shutdown drain timed out; forcing exit"
261                );
262            } else {
263                info!("Graceful shutdown complete");
264            }
265        }
266
267        Ok(())
268    }
269
270    /// Start server on an externally created listener.
271    ///
272    /// Used in tests to discover the bound port before serving.
273    /// Skips TLS, Flight, and observer startup — suitable for unit/integration tests only.
274    ///
275    /// # Errors
276    ///
277    /// Returns error if the server encounters a runtime error.
278    pub async fn serve_on_listener<F>(self, listener: TcpListener, shutdown: F) -> Result<()>
279    where
280        F: std::future::Future<Output = ()> + Send + 'static,
281    {
282        let (app, _app_state) = self.build_router();
283        axum::serve(listener, app)
284            .with_graceful_shutdown(shutdown)
285            .await
286            .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
287        Ok(())
288    }
289
290    /// Listen for shutdown signals (Ctrl+C or SIGTERM)
291    pub async fn shutdown_signal() {
292        use tokio::signal;
293
294        let ctrl_c = async {
295            match signal::ctrl_c().await {
296                Ok(()) => {},
297                Err(e) => {
298                    warn!(error = %e, "Failed to install Ctrl+C handler");
299                    std::future::pending::<()>().await;
300                },
301            }
302        };
303
304        #[cfg(unix)]
305        let terminate = async {
306            match signal::unix::signal(signal::unix::SignalKind::terminate()) {
307                Ok(mut s) => {
308                    s.recv().await;
309                },
310                Err(e) => {
311                    warn!(error = %e, "Failed to install SIGTERM handler");
312                    std::future::pending::<()>().await;
313                },
314            }
315        };
316
317        #[cfg(not(unix))]
318        let terminate = std::future::pending::<()>();
319
320        tokio::select! {
321            () = ctrl_c => info!("Received Ctrl+C"),
322            () = terminate => info!("Received SIGTERM"),
323        }
324    }
325}