Skip to main content

orca_proxy/
lib.rs

1//! Reverse proxy with HTTP routing for containers and Wasm trigger dispatch.
2//!
3//! Routes HTTP traffic by `Host` header to container backends (round-robin),
4//! and by path pattern to Wasm component invocations via a callback.
5//! Supports automatic TLS via ACME/Let's Encrypt (Caddy-style zero-config).
6
7pub mod acme;
8mod handler;
9mod routing;
10pub mod tls;
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::sync::atomic::AtomicUsize;
15
16use hyper::Request;
17use hyper::body::Incoming;
18use hyper::server::conn::http1;
19use hyper::service::service_fn;
20use hyper_util::rt::TokioIo;
21use tokio::net::TcpListener;
22use tokio::sync::RwLock;
23use tracing::{debug, error, info, warn};
24
25use acme::AcmeManager;
26use handler::{handle_acme_challenge, handle_request};
27
28/// A backend target for container routing.
29#[derive(Debug, Clone)]
30pub struct RouteTarget {
31    /// Address in the form `ip:port`.
32    pub address: String,
33    /// Owning service name.
34    pub service_name: String,
35    /// Optional path pattern (e.g., `"/api/*"`). When `None`, this target is a
36    /// catch-all for the domain. When `Some`, only requests whose path matches
37    /// the pattern are routed here. Longest-prefix match wins.
38    pub path_pattern: Option<String>,
39}
40
41/// A Wasm HTTP trigger: maps a path pattern to a Wasm runtime instance.
42#[derive(Debug, Clone)]
43pub struct WasmTrigger {
44    /// Path pattern (e.g., "/api/edge/*").
45    pub pattern: String,
46    /// Wasm runtime instance ID.
47    pub runtime_id: String,
48    /// Service name for logging.
49    pub service_name: String,
50}
51
52/// Callback invoked when a request matches a Wasm trigger.
53/// Receives (runtime_id, method, path, body) and returns the response body string.
54pub type WasmInvoker =
55    Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
56
57/// Future type returned by the Wasm invoker.
58pub type WasmInvokeFuture =
59    std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
60
61/// Shared Wasm trigger table type.
62pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
63
64/// Run the reverse proxy on the given port.
65///
66/// Routes by Host header to container backends, and by path pattern to Wasm
67/// components.
68pub async fn run_proxy(
69    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
70    wasm_triggers: SharedWasmTriggers,
71    wasm_invoker: Option<WasmInvoker>,
72    port: u16,
73    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
74    acme_manager: Option<AcmeManager>,
75) -> anyhow::Result<()> {
76    let addr = format!("0.0.0.0:{port}");
77    let listener = TcpListener::bind(&addr).await?;
78    let proto = if tls_acceptor.is_some() {
79        "HTTPS"
80    } else {
81        "HTTP"
82    };
83    info!("Reverse proxy listening on {addr} ({proto})");
84
85    serve_loop(
86        listener,
87        route_table,
88        wasm_triggers,
89        wasm_invoker,
90        tls_acceptor,
91        acme_manager,
92    )
93    .await
94}
95
96/// Shared dynamic cert resolver for hot-provisioning.
97pub type SharedCertResolver = Arc<acme::DynCertResolver>;
98
99/// Run HTTP on port 80 (for ACME challenges + redirect) and HTTPS on port 443.
100///
101/// Automatically provisions certs for all given domains via Let's Encrypt.
102/// Returns a `SharedCertResolver` that can be used to hot-provision certs
103/// for new domains added later via `orca deploy`.
104pub async fn run_proxy_with_acme(
105    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
106    wasm_triggers: SharedWasmTriggers,
107    wasm_invoker: Option<WasmInvoker>,
108    acme_manager: AcmeManager,
109    domains: Vec<String>,
110) -> anyhow::Result<SharedCertResolver> {
111    let resolver = Arc::new(acme::DynCertResolver::new());
112
113    let acme_mgr = acme_manager.clone();
114    let routes_clone = route_table.clone();
115    let triggers_clone = wasm_triggers.clone();
116    let invoker_clone = wasm_invoker.clone();
117
118    // Start HTTP on port 80 first (needed for ACME challenge validation)
119    let http_handle = tokio::spawn({
120        let acme = acme_mgr.clone();
121        let routes = routes_clone.clone();
122        let triggers = triggers_clone.clone();
123        let invoker = invoker_clone.clone();
124        async move {
125            if let Err(e) = run_proxy(routes, triggers, invoker, 80, None, Some(acme)).await {
126                error!("HTTP listener failed: {e}");
127            }
128        }
129    });
130
131    // Provision certs for initial domains, then start HTTPS with SNI resolver
132    let resolver_clone = resolver.clone();
133    let https_handle = tokio::spawn(async move {
134        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
135
136        // Provision all initial domain certs
137        for domain in &domains {
138            if let Err(e) = acme_mgr
139                .ensure_cert_for_resolver(domain, &resolver_clone)
140                .await
141            {
142                error!(domain = %domain, error = %e, "Failed to provision cert");
143            }
144        }
145
146        // Build TlsAcceptor with SNI resolver for multi-domain support
147        let config = rustls::ServerConfig::builder()
148            .with_no_client_auth()
149            .with_cert_resolver(resolver_clone);
150
151        let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
152        info!(
153            "Starting HTTPS with SNI resolver ({} domains)",
154            domains.len()
155        );
156
157        let routes = routes_clone;
158        let triggers = triggers_clone;
159        let invoker = invoker_clone;
160        if let Err(e) = run_proxy(
161            routes,
162            triggers,
163            invoker,
164            443,
165            Some(acceptor),
166            Some(acme_mgr),
167        )
168        .await
169        {
170            error!("HTTPS listener failed: {e}");
171        }
172    });
173
174    // Don't block — return the resolver so the control plane can hot-add certs.
175    // The HTTP and HTTPS listeners run in the background.
176    tokio::spawn(async move {
177        tokio::select! {
178            _ = http_handle => warn!("HTTP listener exited"),
179            _ = https_handle => warn!("HTTPS listener exited"),
180        }
181    });
182
183    Ok(resolver)
184}
185
186/// Core accept loop shared by HTTP and HTTPS listeners.
187async fn serve_loop(
188    listener: TcpListener,
189    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
190    wasm_triggers: SharedWasmTriggers,
191    wasm_invoker: Option<WasmInvoker>,
192    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
193    acme_manager: Option<AcmeManager>,
194) -> anyhow::Result<()> {
195    let counter = Arc::new(AtomicUsize::new(0));
196    let client = Arc::new(
197        reqwest::Client::builder()
198            .no_proxy()
199            .build()
200            .expect("failed to build HTTP client"),
201    );
202    let acme = acme_manager.map(Arc::new);
203
204    loop {
205        let (stream, peer) = match listener.accept().await {
206            Ok(conn) => conn,
207            Err(e) => {
208                warn!("Proxy accept error: {e}");
209                continue;
210            }
211        };
212
213        let routes = route_table.clone();
214        let triggers = wasm_triggers.clone();
215        let invoker = wasm_invoker.clone();
216        let counter = counter.clone();
217        let client = client.clone();
218        let acme = acme.clone();
219        let tls = tls_acceptor.clone();
220
221        tokio::spawn(async move {
222            let service = service_fn(move |req: Request<Incoming>| {
223                let routes = routes.clone();
224                let triggers = triggers.clone();
225                let invoker = invoker.clone();
226                let counter = counter.clone();
227                let client = client.clone();
228                let acme = acme.clone();
229                async move {
230                    if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
231                        return Ok(resp);
232                    }
233                    handle_request(req, &routes, &triggers, invoker.as_ref(), &counter, &client)
234                        .await
235                }
236            });
237
238            if let Some(acceptor) = tls {
239                match acceptor.accept(stream).await {
240                    Ok(tls_stream) => {
241                        let io = TokioIo::new(tls_stream);
242                        if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
243                            debug!("TLS proxy error from {peer}: {e}");
244                        }
245                    }
246                    Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
247                }
248            } else {
249                let io = TokioIo::new(stream);
250                if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
251                    debug!("Proxy connection error from {peer}: {e}");
252                }
253            }
254        });
255    }
256}