Skip to main content

orca_proxy/
lib.rs

1//! Reverse proxy with HTTP routing for containers and Wasm trigger dispatch.
2//!
3//! Routes HTTP traffic by `Host` header to container backends (round-robin),
4//! and by path pattern to Wasm component invocations via a callback.
5//! Supports automatic TLS via ACME/Let's Encrypt (Caddy-style zero-config).
6
7pub mod acme;
8mod forward;
9mod handler;
10pub mod rate_limit;
11mod routing;
12pub mod tls;
13
14use std::collections::HashMap;
15use std::sync::Arc;
16use std::sync::atomic::AtomicUsize;
17
18use hyper::Request;
19use hyper::body::Incoming;
20use hyper::server::conn::http1;
21use hyper::service::service_fn;
22use hyper_util::rt::TokioIo;
23use tokio::net::TcpListener;
24use tokio::sync::RwLock;
25use tracing::{debug, error, info, warn};
26
27use acme::AcmeManager;
28use handler::{handle_acme_challenge, handle_request};
29use rate_limit::RateLimiter;
30
31/// A backend target for container routing.
32#[derive(Debug, Clone)]
33pub struct RouteTarget {
34    /// Address in the form `ip:port`.
35    pub address: String,
36    /// Owning service name.
37    pub service_name: String,
38    /// Optional path pattern (e.g., `"/api/*"`). When `None`, this target is a
39    /// catch-all for the domain. When `Some`, only requests whose path matches
40    /// the pattern are routed here. Longest-prefix match wins.
41    pub path_pattern: Option<String>,
42}
43
44/// A Wasm HTTP trigger: maps a path pattern to a Wasm runtime instance.
45#[derive(Debug, Clone)]
46pub struct WasmTrigger {
47    /// Path pattern (e.g., "/api/edge/*").
48    pub pattern: String,
49    /// Wasm runtime instance ID.
50    pub runtime_id: String,
51    /// Service name for logging.
52    pub service_name: String,
53}
54
55/// Callback invoked when a request matches a Wasm trigger.
56/// Receives (runtime_id, method, path, body) and returns the response body string.
57pub type WasmInvoker =
58    Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
59
60/// Future type returned by the Wasm invoker.
61pub type WasmInvokeFuture =
62    std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
63
64/// Shared Wasm trigger table type.
65pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
66
67/// Run the reverse proxy on the given port.
68///
69/// Routes by Host header to container backends, and by path pattern to Wasm
70/// components.
71pub async fn run_proxy(
72    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
73    wasm_triggers: SharedWasmTriggers,
74    wasm_invoker: Option<WasmInvoker>,
75    port: u16,
76    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
77    acme_manager: Option<AcmeManager>,
78) -> anyhow::Result<()> {
79    let addr = format!("0.0.0.0:{port}");
80    let listener = TcpListener::bind(&addr).await?;
81    let proto = if tls_acceptor.is_some() {
82        "HTTPS"
83    } else {
84        "HTTP"
85    };
86    info!("Reverse proxy listening on {addr} ({proto})");
87
88    serve_loop(
89        listener,
90        route_table,
91        wasm_triggers,
92        wasm_invoker,
93        tls_acceptor,
94        acme_manager,
95    )
96    .await
97}
98
99/// Shared dynamic cert resolver for hot-provisioning.
100pub type SharedCertResolver = Arc<acme::DynCertResolver>;
101
102/// Run HTTP on port 80 (for ACME challenges + redirect) and HTTPS on port 443.
103///
104/// Automatically provisions certs for all given domains via Let's Encrypt.
105/// Returns a `SharedCertResolver` that can be used to hot-provision certs
106/// for new domains added later via `orca deploy`.
107pub async fn run_proxy_with_acme(
108    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
109    wasm_triggers: SharedWasmTriggers,
110    wasm_invoker: Option<WasmInvoker>,
111    acme_manager: AcmeManager,
112    domains: Vec<String>,
113) -> anyhow::Result<SharedCertResolver> {
114    let resolver = Arc::new(acme::DynCertResolver::new());
115
116    let acme_mgr = acme_manager.clone();
117    let routes_clone = route_table.clone();
118    let triggers_clone = wasm_triggers.clone();
119    let invoker_clone = wasm_invoker.clone();
120
121    // Start HTTP on port 80 first (needed for ACME challenge validation)
122    let http_handle = tokio::spawn({
123        let acme = acme_mgr.clone();
124        let routes = routes_clone.clone();
125        let triggers = triggers_clone.clone();
126        let invoker = invoker_clone.clone();
127        async move {
128            if let Err(e) = run_proxy(routes, triggers, invoker, 80, None, Some(acme)).await {
129                error!("HTTP listener failed: {e}");
130            }
131        }
132    });
133
134    // Provision certs for initial domains, then start HTTPS with SNI resolver
135    let resolver_clone = resolver.clone();
136    let https_handle = tokio::spawn(async move {
137        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
138
139        // Provision all initial domain certs
140        for domain in &domains {
141            if let Err(e) = acme_mgr
142                .ensure_cert_for_resolver(domain, &resolver_clone)
143                .await
144            {
145                error!(domain = %domain, error = %e, "Failed to provision cert");
146            }
147        }
148
149        // Build TlsAcceptor with SNI resolver for multi-domain support
150        let config = rustls::ServerConfig::builder()
151            .with_no_client_auth()
152            .with_cert_resolver(resolver_clone);
153
154        let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
155        info!(
156            "Starting HTTPS with SNI resolver ({} domains)",
157            domains.len()
158        );
159
160        let routes = routes_clone;
161        let triggers = triggers_clone;
162        let invoker = invoker_clone;
163        if let Err(e) = run_proxy(
164            routes,
165            triggers,
166            invoker,
167            443,
168            Some(acceptor),
169            Some(acme_mgr),
170        )
171        .await
172        {
173            error!("HTTPS listener failed: {e}");
174        }
175    });
176
177    // Don't block — return the resolver so the control plane can hot-add certs.
178    // The HTTP and HTTPS listeners run in the background.
179    tokio::spawn(async move {
180        tokio::select! {
181            _ = http_handle => warn!("HTTP listener exited"),
182            _ = https_handle => warn!("HTTPS listener exited"),
183        }
184    });
185
186    Ok(resolver)
187}
188
189/// Core accept loop shared by HTTP and HTTPS listeners.
190async fn serve_loop(
191    listener: TcpListener,
192    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
193    wasm_triggers: SharedWasmTriggers,
194    wasm_invoker: Option<WasmInvoker>,
195    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
196    acme_manager: Option<AcmeManager>,
197) -> anyhow::Result<()> {
198    let counter = Arc::new(AtomicUsize::new(0));
199    let client = Arc::new(
200        reqwest::Client::builder()
201            .no_proxy()
202            .build()
203            .expect("failed to build HTTP client"),
204    );
205    let acme = acme_manager.map(Arc::new);
206    let is_tls = tls_acceptor.is_some();
207    let rate_limiter = RateLimiter::new();
208
209    loop {
210        let (stream, peer) = match listener.accept().await {
211            Ok(conn) => conn,
212            Err(e) => {
213                warn!("Proxy accept error: {e}");
214                continue;
215            }
216        };
217
218        let routes = route_table.clone();
219        let triggers = wasm_triggers.clone();
220        let invoker = wasm_invoker.clone();
221        let counter = counter.clone();
222        let client = client.clone();
223        let acme = acme.clone();
224        let tls = tls_acceptor.clone();
225        let rl = rate_limiter.clone();
226
227        tokio::spawn(async move {
228            let service = service_fn(move |req: Request<Incoming>| {
229                let routes = routes.clone();
230                let triggers = triggers.clone();
231                let invoker = invoker.clone();
232                let counter = counter.clone();
233                let client = client.clone();
234                let acme = acme.clone();
235                let rl = rl.clone();
236                async move {
237                    if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
238                        return Ok(resp);
239                    }
240                    handle_request(
241                        req,
242                        &routes,
243                        &triggers,
244                        invoker.as_ref(),
245                        &counter,
246                        &client,
247                        is_tls,
248                        &rl,
249                        peer,
250                    )
251                    .await
252                }
253            });
254
255            if let Some(acceptor) = tls {
256                match acceptor.accept(stream).await {
257                    Ok(tls_stream) => {
258                        let io = TokioIo::new(tls_stream);
259                        if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
260                            debug!("TLS proxy error from {peer}: {e}");
261                        }
262                    }
263                    Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
264                }
265            } else {
266                let io = TokioIo::new(stream);
267                if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
268                    debug!("Proxy connection error from {peer}: {e}");
269                }
270            }
271        });
272    }
273}