Skip to main content

orca_proxy/
lib.rs

1//! Reverse proxy with HTTP routing for containers and Wasm trigger dispatch.
2//!
3//! Routes HTTP traffic by `Host` header to container backends (round-robin),
4//! and by path pattern to Wasm component invocations via a callback.
5//! Supports automatic TLS via ACME/Let's Encrypt (Caddy-style zero-config).
6
7pub mod acme;
8mod handler;
9mod routing;
10pub mod tls;
11
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::sync::atomic::AtomicUsize;
15
16use hyper::Request;
17use hyper::body::Incoming;
18use hyper::server::conn::http1;
19use hyper::service::service_fn;
20use hyper_util::rt::TokioIo;
21use tokio::net::TcpListener;
22use tokio::sync::RwLock;
23use tracing::{debug, error, info, warn};
24
25use acme::AcmeManager;
26use handler::{handle_acme_challenge, handle_request};
27
28/// A backend target for container routing.
29#[derive(Debug, Clone)]
30pub struct RouteTarget {
31    /// Address in the form `ip:port`.
32    pub address: String,
33    /// Owning service name.
34    pub service_name: String,
35    /// Optional path pattern (e.g., `"/api/*"`). When `None`, this target is a
36    /// catch-all for the domain. When `Some`, only requests whose path matches
37    /// the pattern are routed here. Longest-prefix match wins.
38    pub path_pattern: Option<String>,
39}
40
41/// A Wasm HTTP trigger: maps a path pattern to a Wasm runtime instance.
42#[derive(Debug, Clone)]
43pub struct WasmTrigger {
44    /// Path pattern (e.g., "/api/edge/*").
45    pub pattern: String,
46    /// Wasm runtime instance ID.
47    pub runtime_id: String,
48    /// Service name for logging.
49    pub service_name: String,
50}
51
52/// Callback invoked when a request matches a Wasm trigger.
53/// Receives (runtime_id, method, path, body) and returns the response body string.
54pub type WasmInvoker =
55    Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
56
57/// Future type returned by the Wasm invoker.
58pub type WasmInvokeFuture =
59    std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
60
61/// Shared Wasm trigger table type.
62pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
63
64/// Run the reverse proxy on the given port.
65///
66/// Routes by Host header to container backends, and by path pattern to Wasm
67/// components.
68pub async fn run_proxy(
69    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
70    wasm_triggers: SharedWasmTriggers,
71    wasm_invoker: Option<WasmInvoker>,
72    port: u16,
73    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
74    acme_manager: Option<AcmeManager>,
75) -> anyhow::Result<()> {
76    let addr = format!("0.0.0.0:{port}");
77    let listener = TcpListener::bind(&addr).await?;
78    let proto = if tls_acceptor.is_some() {
79        "HTTPS"
80    } else {
81        "HTTP"
82    };
83    info!("Reverse proxy listening on {addr} ({proto})");
84
85    serve_loop(
86        listener,
87        route_table,
88        wasm_triggers,
89        wasm_invoker,
90        tls_acceptor,
91        acme_manager,
92    )
93    .await
94}
95
96/// Run HTTP on port 80 (for ACME challenges + redirect) and HTTPS on port 443.
97///
98/// Automatically provisions certs for all given domains via Let's Encrypt.
99pub async fn run_proxy_with_acme(
100    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
101    wasm_triggers: SharedWasmTriggers,
102    wasm_invoker: Option<WasmInvoker>,
103    acme_manager: AcmeManager,
104    domains: Vec<String>,
105) -> anyhow::Result<()> {
106    let provider = acme_manager.provider();
107
108    // Provision certs for all domains (spawned so HTTP-01 challenges can be
109    // served concurrently on port 80).
110    let acme_mgr = acme_manager.clone();
111    let routes_clone = route_table.clone();
112    let triggers_clone = wasm_triggers.clone();
113    let invoker_clone = wasm_invoker.clone();
114
115    // Start HTTP on port 80 first (needed for ACME challenge validation)
116    let http_handle = tokio::spawn({
117        let acme = acme_mgr.clone();
118        let routes = routes_clone.clone();
119        let triggers = triggers_clone.clone();
120        let invoker = invoker_clone.clone();
121        async move {
122            if let Err(e) = run_proxy(routes, triggers, invoker, 80, None, Some(acme)).await {
123                error!("HTTP listener failed: {e}");
124            }
125        }
126    });
127
128    // Provision certs then start HTTPS
129    let https_handle = tokio::spawn(async move {
130        // Small delay to let HTTP listener start
131        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
132
133        for domain in &domains {
134            info!(domain = %domain, "Auto-provisioning TLS certificate");
135            match provider.ensure_cert(domain).await {
136                Ok(acceptor) => {
137                    info!(domain = %domain, "TLS cert ready, starting HTTPS");
138                    let routes = routes_clone.clone();
139                    let triggers = triggers_clone.clone();
140                    let invoker = invoker_clone.clone();
141                    let acme = acme_mgr.clone();
142                    // Start HTTPS with the first successful cert
143                    if let Err(e) =
144                        run_proxy(routes, triggers, invoker, 443, Some(acceptor), Some(acme)).await
145                    {
146                        error!("HTTPS listener failed: {e}");
147                    }
148                    return;
149                }
150                Err(e) => {
151                    error!(domain = %domain, error = %e, "Failed to provision cert");
152                }
153            }
154        }
155        error!("No TLS certs could be provisioned — HTTPS not started");
156    });
157
158    tokio::select! {
159        _ = http_handle => warn!("HTTP listener exited"),
160        _ = https_handle => warn!("HTTPS listener exited"),
161    }
162    Ok(())
163}
164
165/// Core accept loop shared by HTTP and HTTPS listeners.
166async fn serve_loop(
167    listener: TcpListener,
168    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
169    wasm_triggers: SharedWasmTriggers,
170    wasm_invoker: Option<WasmInvoker>,
171    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
172    acme_manager: Option<AcmeManager>,
173) -> anyhow::Result<()> {
174    let counter = Arc::new(AtomicUsize::new(0));
175    let client = Arc::new(
176        reqwest::Client::builder()
177            .no_proxy()
178            .build()
179            .expect("failed to build HTTP client"),
180    );
181    let acme = acme_manager.map(Arc::new);
182
183    loop {
184        let (stream, peer) = match listener.accept().await {
185            Ok(conn) => conn,
186            Err(e) => {
187                warn!("Proxy accept error: {e}");
188                continue;
189            }
190        };
191
192        let routes = route_table.clone();
193        let triggers = wasm_triggers.clone();
194        let invoker = wasm_invoker.clone();
195        let counter = counter.clone();
196        let client = client.clone();
197        let acme = acme.clone();
198        let tls = tls_acceptor.clone();
199
200        tokio::spawn(async move {
201            let service = service_fn(move |req: Request<Incoming>| {
202                let routes = routes.clone();
203                let triggers = triggers.clone();
204                let invoker = invoker.clone();
205                let counter = counter.clone();
206                let client = client.clone();
207                let acme = acme.clone();
208                async move {
209                    if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
210                        return Ok(resp);
211                    }
212                    handle_request(req, &routes, &triggers, invoker.as_ref(), &counter, &client)
213                        .await
214                }
215            });
216
217            if let Some(acceptor) = tls {
218                match acceptor.accept(stream).await {
219                    Ok(tls_stream) => {
220                        let io = TokioIo::new(tls_stream);
221                        if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
222                            debug!("TLS proxy error from {peer}: {e}");
223                        }
224                    }
225                    Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
226                }
227            } else {
228                let io = TokioIo::new(stream);
229                if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
230                    debug!("Proxy connection error from {peer}: {e}");
231                }
232            }
233        });
234    }
235}