Skip to main content

orca_proxy/
lib.rs

1//! Reverse proxy with HTTP routing for containers and Wasm trigger dispatch.
2//!
3//! Routes HTTP traffic by `Host` header to container backends (round-robin),
4//! and by path pattern to Wasm component invocations via a callback.
5//! Supports automatic TLS via ACME/Let's Encrypt (Caddy-style zero-config).
6
7pub mod acme;
8mod forward;
9mod handler;
10pub mod rate_limit;
11mod routing;
12pub mod sni;
13pub mod tls;
14mod websocket;
15
16pub use orca_core::config::FallbackConfig;
17
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::sync::atomic::AtomicUsize;
21
22use hyper::Request;
23use hyper::body::Incoming;
24use hyper::server::conn::http1;
25use hyper::service::service_fn;
26use hyper_util::rt::TokioIo;
27use tokio::net::TcpListener;
28use tokio::sync::RwLock;
29use tracing::{debug, error, info, warn};
30
31use acme::AcmeManager;
32use handler::{handle_acme_challenge, handle_request};
33use rate_limit::RateLimiter;
34
35/// A backend target for container routing.
36#[derive(Debug, Clone)]
37pub struct RouteTarget {
38    /// Address in the form `ip:port`.
39    pub address: String,
40    /// Owning service name.
41    pub service_name: String,
42    /// Optional path pattern (e.g., `"/api/*"`). When `None`, this target is a
43    /// catch-all for the domain. When `Some`, only requests whose path matches
44    /// the pattern are routed here. Longest-prefix match wins.
45    pub path_pattern: Option<String>,
46    /// Traffic weight (1-100, default 100). Used for weighted routing
47    /// during canary deployments. Higher weight = more traffic.
48    pub weight: u32,
49    /// Prefix to strip from the request path before forwarding upstream,
50    /// e.g. `"/admin"`. With `path_pattern = "/admin/*"` and
51    /// `strip_prefix = Some("/admin")`, a request for `/admin/users` is
52    /// forwarded as `/users` — same semantics as Caddy's `handle_path`.
53    pub strip_prefix: Option<String>,
54}
55
56/// A Wasm HTTP trigger: maps a path pattern to a Wasm runtime instance.
57#[derive(Debug, Clone)]
58pub struct WasmTrigger {
59    /// Path pattern (e.g., "/api/edge/*").
60    pub pattern: String,
61    /// Wasm runtime instance ID.
62    pub runtime_id: String,
63    /// Service name for logging.
64    pub service_name: String,
65}
66
67/// Callback invoked when a request matches a Wasm trigger.
68/// Receives (runtime_id, method, path, body) and returns the response body string.
69pub type WasmInvoker =
70    Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
71
72/// Future type returned by the Wasm invoker.
73pub type WasmInvokeFuture =
74    std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
75
76/// Shared Wasm trigger table type.
77pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
78
79/// Run the reverse proxy on the given port.
80pub async fn run_proxy(
81    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
82    wasm_triggers: SharedWasmTriggers,
83    wasm_invoker: Option<WasmInvoker>,
84    port: u16,
85    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
86    acme_manager: Option<AcmeManager>,
87) -> anyhow::Result<()> {
88    let addr = format!("0.0.0.0:{port}");
89    let listener = TcpListener::bind(&addr).await?;
90    let proto = if tls_acceptor.is_some() {
91        "HTTPS"
92    } else {
93        "HTTP"
94    };
95    info!("Reverse proxy listening on {addr} ({proto})");
96
97    serve_loop(
98        listener,
99        route_table,
100        wasm_triggers,
101        wasm_invoker,
102        tls_acceptor,
103        acme_manager,
104    )
105    .await
106}
107
108/// Run the proxy with optional fallback support.
109#[allow(clippy::too_many_arguments)]
110pub async fn run_proxy_with_fallback(
111    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
112    wasm_triggers: SharedWasmTriggers,
113    wasm_invoker: Option<WasmInvoker>,
114    port: u16,
115    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
116    acme_manager: Option<AcmeManager>,
117    fallback: Option<FallbackConfig>,
118) -> anyhow::Result<()> {
119    let addr = format!("0.0.0.0:{port}");
120    let listener = TcpListener::bind(&addr).await?;
121    let proto = if tls_acceptor.is_some() {
122        "HTTPS"
123    } else {
124        "HTTP"
125    };
126    info!("Reverse proxy listening on {addr} ({proto})");
127
128    serve_loop_with_fallback(
129        listener,
130        route_table,
131        wasm_triggers,
132        wasm_invoker,
133        tls_acceptor,
134        acme_manager,
135        fallback,
136    )
137    .await
138}
139
140/// Shared dynamic cert resolver for hot-provisioning.
141pub type SharedCertResolver = Arc<acme::DynCertResolver>;
142
143/// Run HTTP on port 80 (for ACME challenges + redirect) and HTTPS on port 443.
144///
145/// Automatically provisions certs for all given domains via Let's Encrypt.
146/// Returns a `SharedCertResolver` that can be used to hot-provision certs
147/// for new domains added later via `orca deploy`.
148pub async fn run_proxy_with_acme(
149    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
150    wasm_triggers: SharedWasmTriggers,
151    wasm_invoker: Option<WasmInvoker>,
152    acme_manager: AcmeManager,
153    domains: Vec<String>,
154) -> anyhow::Result<SharedCertResolver> {
155    run_proxy_with_acme_and_fallback(
156        route_table,
157        wasm_triggers,
158        wasm_invoker,
159        acme_manager,
160        domains,
161        None,
162    )
163    .await
164}
165
166/// Run HTTP+HTTPS with ACME and optional fallback to another reverse proxy.
167#[allow(clippy::too_many_arguments)]
168pub async fn run_proxy_with_acme_and_fallback(
169    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
170    wasm_triggers: SharedWasmTriggers,
171    wasm_invoker: Option<WasmInvoker>,
172    acme_manager: AcmeManager,
173    domains: Vec<String>,
174    fallback: Option<FallbackConfig>,
175) -> anyhow::Result<SharedCertResolver> {
176    let resolver = Arc::new(acme::DynCertResolver::new());
177
178    let acme_mgr = acme_manager.clone();
179    let routes_clone = route_table.clone();
180    let triggers_clone = wasm_triggers.clone();
181    let invoker_clone = wasm_invoker.clone();
182    let fallback_http = fallback.clone();
183    let fallback_tls = fallback.clone();
184
185    // Start HTTP on port 80 first (needed for ACME challenge validation)
186    let http_handle = tokio::spawn({
187        let acme = acme_mgr.clone();
188        let routes = routes_clone.clone();
189        let triggers = triggers_clone.clone();
190        let invoker = invoker_clone.clone();
191        async move {
192            if let Err(e) = run_proxy_with_fallback(
193                routes,
194                triggers,
195                invoker,
196                80,
197                None,
198                Some(acme),
199                fallback_http,
200            )
201            .await
202            {
203                error!("HTTP listener failed: {e}");
204            }
205        }
206    });
207
208    // Provision certs for initial domains, then start HTTPS with SNI resolver
209    let resolver_clone = resolver.clone();
210    let https_handle = tokio::spawn(async move {
211        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
212
213        // Provision all initial domain certs
214        for domain in &domains {
215            if let Err(e) = acme_mgr
216                .ensure_cert_for_resolver(domain, &resolver_clone)
217                .await
218            {
219                error!(domain = %domain, error = %e, "Failed to provision cert");
220            }
221        }
222
223        // Build TlsAcceptor with SNI resolver for multi-domain support
224        let config = rustls::ServerConfig::builder()
225            .with_no_client_auth()
226            .with_cert_resolver(resolver_clone);
227
228        let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
229        info!(
230            "Starting HTTPS with SNI resolver ({} domains)",
231            domains.len()
232        );
233
234        let routes = routes_clone;
235        let triggers = triggers_clone;
236        let invoker = invoker_clone;
237        if let Err(e) = run_proxy_with_fallback(
238            routes,
239            triggers,
240            invoker,
241            443,
242            Some(acceptor),
243            Some(acme_mgr),
244            fallback_tls,
245        )
246        .await
247        {
248            error!("HTTPS listener failed: {e}");
249        }
250    });
251
252    // Don't block — return the resolver so the control plane can hot-add certs.
253    // The HTTP and HTTPS listeners run in the background.
254    tokio::spawn(async move {
255        tokio::select! {
256            _ = http_handle => warn!("HTTP listener exited"),
257            _ = https_handle => warn!("HTTPS listener exited"),
258        }
259    });
260
261    Ok(resolver)
262}
263
264/// Core accept loop shared by HTTP and HTTPS listeners.
265async fn serve_loop(
266    listener: TcpListener,
267    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
268    wasm_triggers: SharedWasmTriggers,
269    wasm_invoker: Option<WasmInvoker>,
270    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
271    acme_manager: Option<AcmeManager>,
272) -> anyhow::Result<()> {
273    serve_loop_with_fallback(
274        listener,
275        route_table,
276        wasm_triggers,
277        wasm_invoker,
278        tls_acceptor,
279        acme_manager,
280        None,
281    )
282    .await
283}
284
285/// Serve loop variant with fallback support for SNI passthrough and HTTP forwarding.
286#[allow(clippy::too_many_arguments)]
287pub(crate) async fn serve_loop_with_fallback(
288    listener: TcpListener,
289    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
290    wasm_triggers: SharedWasmTriggers,
291    wasm_invoker: Option<WasmInvoker>,
292    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
293    acme_manager: Option<AcmeManager>,
294    fallback: Option<FallbackConfig>,
295) -> anyhow::Result<()> {
296    let counter = Arc::new(AtomicUsize::new(0));
297    let client = Arc::new(
298        reqwest::Client::builder()
299            .no_proxy()
300            .redirect(reqwest::redirect::Policy::none())
301            .build()
302            .expect("failed to build HTTP client"),
303    );
304    let acme = acme_manager.map(Arc::new);
305    let is_tls = tls_acceptor.is_some();
306    let rate_limiter = RateLimiter::new();
307
308    let fallback = Arc::new(fallback);
309    loop {
310        let (stream, peer) = match listener.accept().await {
311            Ok(conn) => conn,
312            Err(e) => {
313                warn!("Proxy accept error: {e}");
314                continue;
315            }
316        };
317
318        let routes = route_table.clone();
319        let triggers = wasm_triggers.clone();
320        let invoker = wasm_invoker.clone();
321        let counter = counter.clone();
322        let client = client.clone();
323        let acme = acme.clone();
324        let tls = tls_acceptor.clone();
325        let rl = rate_limiter.clone();
326        let fb = fallback.clone();
327        let routes_for_sni = routes.clone();
328
329        tokio::spawn(async move {
330            let service = service_fn(move |req: Request<Incoming>| {
331                let routes = routes.clone();
332                let triggers = triggers.clone();
333                let invoker = invoker.clone();
334                let counter = counter.clone();
335                let client = client.clone();
336                let acme = acme.clone();
337                let rl = rl.clone();
338                async move {
339                    if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
340                        return Ok(resp);
341                    }
342                    handle_request(
343                        req,
344                        &routes,
345                        &triggers,
346                        invoker.as_ref(),
347                        &counter,
348                        &client,
349                        is_tls,
350                        &rl,
351                        peer,
352                    )
353                    .await
354                }
355            });
356            if let Some(acceptor) = tls {
357                let mut stream = stream;
358                // Peek SNI to decide between local TLS termination and pass-through
359                let sni = sni::peek_sni(&mut stream).await;
360                let should_passthrough = if let Some(ref host) = sni {
361                    let routes_lock = routes_for_sni.read().await;
362                    let known = routes_lock.contains_key(host);
363                    drop(routes_lock);
364                    !known && fb.as_ref().as_ref().and_then(|f| f.tls.as_ref()).is_some()
365                } else {
366                    false
367                };
368
369                if should_passthrough {
370                    let target = fb
371                        .as_ref()
372                        .as_ref()
373                        .and_then(|f| f.tls.clone())
374                        .expect("checked above");
375                    debug!(?sni, %target, "SNI passthrough");
376                    match tokio::net::TcpStream::connect(&target).await {
377                        Ok(mut backend) => {
378                            if let Err(e) =
379                                tokio::io::copy_bidirectional(&mut stream, &mut backend).await
380                            {
381                                debug!("Passthrough copy error from {peer}: {e}");
382                            }
383                        }
384                        Err(e) => warn!("Failed to connect to TLS fallback {target}: {e}"),
385                    }
386                    return;
387                }
388
389                match acceptor.accept(stream).await {
390                    Ok(tls_stream) => {
391                        let io = TokioIo::new(tls_stream);
392                        if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
393                            debug!("TLS proxy error from {peer}: {e}");
394                        }
395                    }
396                    Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
397                }
398            } else {
399                let io = TokioIo::new(stream);
400                if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
401                    debug!("Proxy connection error from {peer}: {e}");
402                }
403            }
404        });
405    }
406}