Skip to main content

fraiseql_server/server/
lifecycle.rs

1//! Server lifecycle: serve, `serve_with_shutdown`, and `shutdown_signal`.
2
3use tokio::net::TcpListener;
4use tracing::{error, info, warn};
5
6use super::{DatabaseAdapter, Result, Server, ServerError, TlsSetup};
7
8impl<A: DatabaseAdapter + Clone + Send + Sync + 'static> Server<A> {
9    /// Start server and listen for requests.
10    ///
11    /// Uses SIGUSR1-aware shutdown signal when a schema path is configured,
12    /// enabling zero-downtime schema reloads via `kill -USR1 <pid>`.
13    ///
14    /// # Errors
15    ///
16    /// Returns error if server fails to bind or encounters runtime errors.
17    pub async fn serve(self) -> Result<()> {
18        self.serve_with_shutdown(Self::shutdown_signal()).await
19    }
20
21    /// Start server with a custom shutdown future.
22    ///
23    /// Enables programmatic shutdown (e.g., for `--watch` hot-reload) by accepting any
24    /// future that resolves when the server should stop.
25    ///
26    /// # Errors
27    ///
28    /// Returns error if server fails to bind or encounters runtime errors.
29    #[allow(clippy::cognitive_complexity)] // Reason: server lifecycle with TLS/non-TLS binding, signal handling, and graceful shutdown
30    pub async fn serve_with_shutdown<F>(self, shutdown: F) -> Result<()>
31    where
32        F: std::future::Future<Output = ()> + Send + 'static,
33    {
34        // Ensure RBAC schema exists before the router mounts RBAC endpoints.
35        // Must run here (async context) rather than inside build_router() (sync).
36        #[cfg(feature = "observers")]
37        if let Some(ref db_pool) = self.db_pool {
38            if self.config.admin_token.is_some() {
39                let rbac_backend =
40                    crate::api::rbac_management::db_backend::RbacDbBackend::new(db_pool.clone());
41                rbac_backend.ensure_schema().await.map_err(|e| {
42                    ServerError::ConfigError(format!("Failed to initialize RBAC schema: {e}"))
43                })?;
44            }
45        }
46
47        let (app, app_state) = self.build_router();
48
49        // Spawn SIGUSR1 schema reload handler when running on Unix.
50        // The handler loops forever, reloading on each signal, until the
51        // server process exits.
52        #[cfg(unix)]
53        if let Some(ref schema_path) = app_state.schema_path {
54            let reload_state = app_state.clone();
55            let reload_path = schema_path.clone();
56            tokio::spawn(async move {
57                let mut sigusr1 = match tokio::signal::unix::signal(
58                    tokio::signal::unix::SignalKind::user_defined1(),
59                ) {
60                    Ok(s) => s,
61                    Err(e) => {
62                        warn!(error = %e, "Failed to install SIGUSR1 handler — schema hot-reload disabled");
63                        return;
64                    },
65                };
66                loop {
67                    sigusr1.recv().await;
68                    info!(
69                        path = %reload_path.display(),
70                        "Received SIGUSR1 — reloading schema"
71                    );
72                    match reload_state.reload_schema(&reload_path).await {
73                        Ok(()) => {
74                            let hash = reload_state.executor().schema().content_hash();
75                            reload_state
76                                .metrics
77                                .schema_reloads_total
78                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
79                            info!(schema_hash = %hash, "Schema reloaded successfully via SIGUSR1");
80                        },
81                        Err(e) => {
82                            reload_state
83                                .metrics
84                                .schema_reload_errors_total
85                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
86                            error!(
87                                error = %e,
88                                path = %reload_path.display(),
89                                "Schema reload failed via SIGUSR1 — keeping previous schema"
90                            );
91                        },
92                    }
93                }
94            });
95            info!(
96                path = %schema_path.display(),
97                "SIGUSR1 schema reload handler installed"
98            );
99        }
100
101        // Initialize TLS setup
102        let tls_setup = TlsSetup::new(self.config.tls.clone(), self.config.database_tls.clone())?;
103
104        info!(
105            bind_addr = %self.config.bind_addr,
106            graphql_path = %self.config.graphql_path,
107            tls_enabled = tls_setup.is_tls_enabled(),
108            "Starting FraiseQL server"
109        );
110
111        // Start observer runtime if configured
112        #[cfg(feature = "observers")]
113        if let Some(ref runtime) = self.observer_runtime {
114            info!("Starting observer runtime...");
115            let mut guard = runtime.write().await;
116
117            match guard.start().await {
118                Ok(()) => info!("Observer runtime started"),
119                Err(e) => {
120                    error!("Failed to start observer runtime: {}", e);
121                    warn!("Server will continue without observers");
122                },
123            }
124            drop(guard);
125        }
126
127        let listener = TcpListener::bind(self.config.bind_addr)
128            .await
129            .map_err(|e| ServerError::BindError(e.to_string()))?;
130
131        // Warn if the process file descriptor limit is below the recommended minimum.
132        // A low limit causes "too many open files" errors under load.
133        #[cfg(target_os = "linux")]
134        {
135            if let Ok(limits) = std::fs::read_to_string("/proc/self/limits") {
136                for line in limits.lines() {
137                    if line.starts_with("Max open files") {
138                        let parts: Vec<&str> = line.split_whitespace().collect();
139                        if let Some(soft) = parts.get(3) {
140                            if let Ok(n) = soft.parse::<u64>() {
141                                if n < 65_536 {
142                                    warn!(
143                                        current_fd_limit = n,
144                                        recommended = 65_536,
145                                        "File descriptor limit is low; consider raising ulimit -n"
146                                    );
147                                }
148                            }
149                        }
150                        break;
151                    }
152                }
153            }
154        }
155
156        // Log TLS configuration
157        if tls_setup.is_tls_enabled() {
158            // Verify TLS setup is valid (will error if certificates are missing/invalid)
159            let _ = tls_setup.create_rustls_config()?;
160            info!(
161                cert_path = ?tls_setup.cert_path(),
162                key_path = ?tls_setup.key_path(),
163                mtls_required = tls_setup.is_mtls_required(),
164                "Server TLS configuration loaded (note: use reverse proxy for server-side TLS termination)"
165            );
166        }
167
168        // Log database TLS configuration
169        info!(
170            postgres_ssl_mode = tls_setup.postgres_ssl_mode(),
171            redis_ssl = tls_setup.redis_ssl_enabled(),
172            clickhouse_https = tls_setup.clickhouse_https_enabled(),
173            elasticsearch_https = tls_setup.elasticsearch_https_enabled(),
174            "Database connection TLS configuration applied"
175        );
176
177        info!("Server listening on http://{}", self.config.bind_addr);
178
179        // Start both HTTP and gRPC servers concurrently if Arrow Flight is enabled
180        #[cfg(feature = "arrow")]
181        if let Some(flight_service) = self.flight_service {
182            let flight_addr = self.config.flight_bind_addr;
183            info!("Arrow Flight server listening on grpc://{}", flight_addr);
184
185            // Spawn Flight server in background
186            let flight_server = tokio::spawn(async move {
187                tonic::transport::Server::builder()
188                    .add_service(flight_service.into_server())
189                    .serve(flight_addr)
190                    .await
191            });
192
193            // Wrap the user-supplied shutdown future so we can also stop observer runtime
194            #[cfg(feature = "observers")]
195            let observer_runtime = self.observer_runtime.clone();
196
197            let shutdown_with_cleanup = async move {
198                shutdown.await;
199                #[cfg(feature = "observers")]
200                if let Some(ref runtime) = observer_runtime {
201                    info!("Shutting down observer runtime");
202                    let mut guard = runtime.write().await;
203                    if let Err(e) = guard.stop().await {
204                        #[cfg(feature = "observers")]
205                        error!("Error stopping runtime: {}", e);
206                    } else {
207                        info!("Runtime stopped cleanly");
208                    }
209                }
210            };
211
212            // Run HTTP server with graceful shutdown
213            axum::serve(listener, app)
214                .with_graceful_shutdown(shutdown_with_cleanup)
215                .await
216                .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
217
218            // Abort Flight server after HTTP server exits
219            flight_server.abort();
220        }
221
222        // HTTP-only server (when arrow feature not enabled)
223        #[cfg(not(feature = "arrow"))]
224        {
225            axum::serve(listener, app)
226                .with_graceful_shutdown(shutdown)
227                .await
228                .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
229
230            let shutdown_timeout =
231                std::time::Duration::from_secs(self.config.shutdown_timeout_secs);
232            info!(
233                timeout_secs = self.config.shutdown_timeout_secs,
234                "HTTP server stopped, draining remaining work"
235            );
236
237            let drain = tokio::time::timeout(shutdown_timeout, async {
238                #[cfg(feature = "observers")]
239                if let Some(ref runtime) = self.observer_runtime {
240                    let mut guard = runtime.write().await;
241                    match guard.stop().await {
242                        Ok(()) => info!("Observer runtime stopped cleanly"),
243                        Err(e) => warn!("Observer runtime shutdown error: {e}"),
244                    }
245                }
246            })
247            .await;
248
249            if drain.is_err() {
250                warn!(
251                    timeout_secs = self.config.shutdown_timeout_secs,
252                    "Shutdown drain timed out; forcing exit"
253                );
254            } else {
255                info!("Graceful shutdown complete");
256            }
257        }
258
259        Ok(())
260    }
261
262    /// Start server on an externally created listener.
263    ///
264    /// Used in tests to discover the bound port before serving.
265    /// Skips TLS, Flight, and observer startup — suitable for unit/integration tests only.
266    ///
267    /// # Errors
268    ///
269    /// Returns error if the server encounters a runtime error.
270    pub async fn serve_on_listener<F>(self, listener: TcpListener, shutdown: F) -> Result<()>
271    where
272        F: std::future::Future<Output = ()> + Send + 'static,
273    {
274        let (app, _app_state) = self.build_router();
275        axum::serve(listener, app)
276            .with_graceful_shutdown(shutdown)
277            .await
278            .map_err(|e| ServerError::IoError(std::io::Error::other(e)))?;
279        Ok(())
280    }
281
282    /// Listen for shutdown signals (Ctrl+C or SIGTERM)
283    pub async fn shutdown_signal() {
284        use tokio::signal;
285
286        let ctrl_c = async {
287            match signal::ctrl_c().await {
288                Ok(()) => {},
289                Err(e) => {
290                    warn!(error = %e, "Failed to install Ctrl+C handler");
291                    std::future::pending::<()>().await;
292                },
293            }
294        };
295
296        #[cfg(unix)]
297        let terminate = async {
298            match signal::unix::signal(signal::unix::SignalKind::terminate()) {
299                Ok(mut s) => {
300                    s.recv().await;
301                },
302                Err(e) => {
303                    warn!(error = %e, "Failed to install SIGTERM handler");
304                    std::future::pending::<()>().await;
305                },
306            }
307        };
308
309        #[cfg(not(unix))]
310        let terminate = std::future::pending::<()>();
311
312        tokio::select! {
313            () = ctrl_c => info!("Received Ctrl+C"),
314            () = terminate => info!("Received SIGTERM"),
315        }
316    }
317}