warpdrive_proxy/server/
mod.rs

1//! Server module - Pingora HTTP server initialization and lifecycle
2//!
3//! This module handles:
4//! - Pingora server initialization
5//! - HTTP/HTTPS listener configuration
6//! - Server lifecycle (startup, shutdown)
7//! - Thread pool configuration
8//!
9//! Based on Pingora's quick start guide and examples.
10
11use anyhow::{Context, Result, anyhow};
12use pingora::apps::HttpServerOptions;
13use pingora::prelude::*;
14use pingora::server::Server;
15use pingora::server::configuration::Opt;
16use std::net::TcpListener;
17use std::sync::Arc;
18use tracing::info;
19
20use crate::acme::{AcmeProvisioner, ChallengeStore};
21use crate::config::Config;
22use crate::proxy::WarpDriveProxy;
23
24/// Validate that a port can be bound
25///
26/// This performs a pre-flight check to ensure the port is available before Pingora tries to bind.
27/// If binding fails, we provide helpful error messages with common causes and solutions.
28fn validate_port_binding(port: u16, protocol: &str) -> Result<()> {
29    let addr = format!("0.0.0.0:{}", port);
30
31    match TcpListener::bind(&addr) {
32        Ok(_listener) => {
33            // Port is available (listener will be dropped immediately)
34            Ok(())
35        }
36        Err(err) => {
37            // Build helpful error message based on error kind
38            let mut error_msg = format!("Failed to bind {} listener to {}", protocol, addr);
39            let mut suggestions = Vec::new();
40
41            match err.kind() {
42                std::io::ErrorKind::PermissionDenied => {
43                    error_msg.push_str("\n\n❌ Permission denied");
44
45                    if port < 1024 {
46                        suggestions.push(format!(
47                            "Port {} requires root privileges or CAP_NET_BIND_SERVICE capability",
48                            port
49                        ));
50                        suggestions.push("Solutions:".to_string());
51                        suggestions.push("  1. Use an unprivileged port (>= 1024), e.g., 8080 for HTTP, 8443 for HTTPS".to_string());
52                        suggestions.push(format!(
53                            "     Set WARPDRIVE_HTTP_PORT={} or WARPDRIVE_HTTPS_PORT=8443",
54                            if port == 80 { "8080" } else { "8443" }
55                        ));
56                        suggestions.push(
57                            "  2. Run with sudo (NOT recommended for production)".to_string(),
58                        );
59                        suggestions.push("  3. Grant capability: sudo setcap 'cap_net_bind_service=+ep' $(which warpdrive)".to_string());
60                    } else {
61                        suggestions
62                            .push("This port should not require elevated privileges".to_string());
63                        suggestions.push("Check file system permissions or security policies (SELinux, AppArmor)".to_string());
64                    }
65                }
66                std::io::ErrorKind::AddrInUse => {
67                    error_msg.push_str("\n\n❌ Address already in use");
68                    suggestions.push(format!(
69                        "Another process is already listening on port {}",
70                        port
71                    ));
72                    suggestions.push("Solutions:".to_string());
73                    suggestions.push("  1. Stop the conflicting process".to_string());
74                    suggestions.push(format!(
75                        "     Check what's using the port: lsof -i :{} or netstat -tulpn | grep {}",
76                        port, port
77                    ));
78                    suggestions.push(format!(
79                        "  2. Use a different port (set WARPDRIVE_{}_PORT=<port>)",
80                        if protocol == "HTTP" { "HTTP" } else { "HTTPS" }
81                    ));
82                    suggestions.push(
83                        "  3. In Docker/Kubernetes, ensure no other container is using this port"
84                            .to_string(),
85                    );
86                }
87                std::io::ErrorKind::AddrNotAvailable => {
88                    error_msg.push_str("\n\n❌ Address not available");
89                    suggestions.push(
90                        "The IP address 0.0.0.0 cannot be bound (network configuration issue)"
91                            .to_string(),
92                    );
93                    suggestions.push("Check network interface configuration".to_string());
94                }
95                _ => {
96                    error_msg.push_str(&format!("\n\n❌ {}", err));
97                    suggestions
98                        .push("This may be a network configuration or OS-level issue".to_string());
99                }
100            }
101
102            // Add environment-specific suggestions
103            suggestions.push("".to_string());
104            suggestions.push("Environment-specific tips:".to_string());
105            suggestions.push(
106                "  • Docker: Ensure -p flag matches container port (e.g., -p 8080:8080)"
107                    .to_string(),
108            );
109            suggestions.push("  • AWS ECS: Check port mappings in task definition".to_string());
110            suggestions.push(
111                "  • Kubernetes: Verify containerPort in pod spec matches config".to_string(),
112            );
113
114            for suggestion in suggestions {
115                error_msg.push('\n');
116                error_msg.push_str(&suggestion);
117            }
118
119            Err(anyhow!(error_msg))
120        }
121    }
122}
123
124/// Start the WarpDrive proxy server
125///
126/// This initializes a Pingora server with the WarpDrive proxy handler,
127/// configures HTTP listeners based on the config, and runs the server.
128///
129/// This function blocks forever (until a shutdown signal is received).
130/// Pingora creates its own runtime internally, so this must be called
131/// from a non-async context.
132///
133/// # Errors
134///
135/// Returns an error if:
136/// - Server initialization fails
137/// - Port binding fails
138/// - Server runtime encounters an error
139pub fn start_server(config: Config) -> Result<()> {
140    info!("Starting WarpDrive server");
141
142    // Create shared config reference
143    let config = Arc::new(config);
144
145    // Create Pingora server configuration
146    // Using default options for now (can be customized via Opt struct)
147    let mut server = Server::new(Some(Opt {
148        upgrade: false,
149        daemon: false,
150        nocapture: false,
151        test: false,
152        conf: None,
153    }))
154    .context("Failed to create Pingora server")?;
155
156    // Bootstrap the server (sets up signal handlers, etc.)
157    server.bootstrap();
158
159    info!("Server bootstrapped successfully");
160
161    // Validate port bindings before proceeding
162    // This provides clear error messages if ports are unavailable
163    info!("Validating port bindings...");
164    validate_port_binding(config.http_port, "HTTP").context("HTTP port validation failed")?;
165
166    if config.has_manual_tls() || config.has_acme_domains() {
167        validate_port_binding(config.https_port, "HTTPS")
168            .context("HTTPS port validation failed")?;
169    }
170
171    info!("Port validation successful");
172
173    // Create router if TOML config is present
174    let router = if let Some(ref toml_config) = config.toml_config {
175        Some(
176            crate::router::Router::from_config(toml_config)
177                .map_err(|e| anyhow::anyhow!("Failed to create router: {}", e))?,
178        )
179    } else {
180        None
181    };
182
183    // Initialize ACME challenge store
184    let challenge_store = ChallengeStore::default();
185
186    // Initialize ACME provisioner if ACME domains are configured
187    if config.has_acme_domains() {
188        info!("Initializing ACME certificate provisioner");
189
190        // Create a runtime for async ACME operations
191        let runtime =
192            tokio::runtime::Runtime::new().context("Failed to create Tokio runtime for ACME")?;
193
194        runtime.block_on(async {
195            let provisioner = AcmeProvisioner::new(config.clone(), challenge_store.clone())
196                .context("Failed to create ACME provisioner")?;
197
198            // Provision certificates for all ACME domains
199            for domain in &config.tls_domains {
200                info!("Provisioning certificate for domain: {}", domain);
201                match provisioner.provision_certificate(domain).await {
202                    Ok((cert_path, key_path)) => {
203                        info!("Certificate provisioned successfully for {}", domain);
204                        info!("  Certificate: {}", cert_path);
205                        info!("  Private key: {}", key_path);
206                    }
207                    Err(e) => {
208                        return Err(anyhow::anyhow!(
209                            "Failed to provision certificate for {}: {}",
210                            domain,
211                            e
212                        ));
213                    }
214                }
215            }
216            Ok::<(), anyhow::Error>(())
217        })?;
218    }
219
220    // Create proxy service
221    let proxy = WarpDriveProxy::new(config.clone(), router, challenge_store);
222
223    // Create the proxy service
224    // This wraps our ProxyHttp implementation in Pingora's service infrastructure
225    let mut proxy_service = http_proxy_service(&server.configuration, proxy);
226
227    // Enable HTTP/2 cleartext (h2c) support if configured
228    // This allows WarpDrive to accept HTTP/2 over plain TCP (no TLS) for AWS ALB and Cloud Run
229    if config.h2c_enabled {
230        info!("Enabling HTTP/2 cleartext (h2c) support");
231        if let Some(app_logic) = proxy_service.app_logic_mut() {
232            let mut server_opts = HttpServerOptions::default();
233            server_opts.h2c = true;
234            app_logic.server_options = Some(server_opts);
235        }
236    }
237
238    // Add HTTP listener
239    let http_addr = format!("0.0.0.0:{}", config.http_port);
240    info!("Adding HTTP listener on {}", http_addr);
241    proxy_service.add_tcp(&http_addr);
242
243    // Add HTTPS listener when TLS is configured (manual cert/key paths)
244    if config.has_manual_tls() {
245        let https_addr = format!("0.0.0.0:{}", config.https_port);
246        let cert_path = config.tls_cert_path.as_ref().unwrap();
247        let key_path = config.tls_key_path.as_ref().unwrap();
248
249        info!("Adding HTTPS listener on {}", https_addr);
250        info!("  Certificate: {}", cert_path);
251        info!("  Private key: {}", key_path);
252
253        proxy_service
254            .add_tls(&https_addr, cert_path, key_path)
255            .context("Failed to add TLS listener")?;
256    }
257
258    // Add the proxy service to the server
259    server.add_service(proxy_service);
260
261    info!("WarpDrive proxy server ready");
262    info!("  HTTP listening on: {}", http_addr);
263    if config.has_manual_tls() {
264        info!("  HTTPS listening on: 0.0.0.0:{}", config.https_port);
265    }
266    info!(
267        "  Proxying to: {}:{}",
268        config.target_host, config.target_port
269    );
270
271    // Run the server forever
272    // This blocks until a shutdown signal is received (SIGTERM, SIGINT)
273    info!("Server started successfully");
274    server.run_forever()
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use std::net::TcpListener;
281
282    #[test]
283    fn test_server_config() {
284        let config = Config::default();
285        assert_eq!(config.http_port, 8080);
286        assert_eq!(config.target_port, 3000);
287    }
288
289    #[test]
290    fn test_port_validation_success() {
291        // Find an available port by binding and immediately dropping
292        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
293        let port = listener.local_addr().unwrap().port();
294        drop(listener);
295
296        // Validation should succeed on available port
297        let result = validate_port_binding(port, "HTTP");
298        assert!(
299            result.is_ok(),
300            "Validation should succeed for available port"
301        );
302    }
303
304    #[test]
305    fn test_port_validation_address_in_use() {
306        // Bind to 0.0.0.0 (same as validation function) and keep it occupied
307        let listener = TcpListener::bind("0.0.0.0:0").unwrap();
308        let port = listener.local_addr().unwrap().port();
309
310        // Validation should fail with "Address already in use"
311        let result = validate_port_binding(port, "HTTP");
312        assert!(result.is_err(), "Validation should fail for occupied port");
313
314        let err = result.unwrap_err();
315        let err_str = format!("{:?}", err);
316        assert!(
317            err_str.contains("Address already in use"),
318            "Error should mention 'Address already in use'"
319        );
320        assert!(
321            err_str.contains("lsof -i"),
322            "Error should suggest using lsof"
323        );
324        assert!(
325            err_str.contains("WARPDRIVE_HTTP_PORT"),
326            "Error should mention WARPDRIVE_HTTP_PORT"
327        );
328    }
329}