Skip to main content

orca_proxy/
lib.rs

1//! Reverse proxy with HTTP routing for containers and Wasm trigger dispatch.
2//!
3//! Routes HTTP traffic by `Host` header to container backends (round-robin),
4//! and by path pattern to Wasm component invocations via a callback.
5
6pub mod acme;
7mod handler;
8mod routing;
9pub mod tls;
10
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::sync::atomic::AtomicUsize;
14
15use hyper::Request;
16use hyper::body::Incoming;
17use hyper::server::conn::http1;
18use hyper::service::service_fn;
19use hyper_util::rt::TokioIo;
20use tokio::net::TcpListener;
21use tokio::sync::RwLock;
22use tracing::{debug, info, warn};
23
24use acme::AcmeManager;
25use handler::{handle_acme_challenge, handle_request};
26
27/// A backend target for container routing.
28#[derive(Debug, Clone)]
29pub struct RouteTarget {
30    /// Address in the form `ip:port`.
31    pub address: String,
32    /// Owning service name.
33    pub service_name: String,
34}
35
36/// A Wasm HTTP trigger: maps a path pattern to a Wasm runtime instance.
37#[derive(Debug, Clone)]
38pub struct WasmTrigger {
39    /// Path pattern (e.g., "/api/edge/*").
40    pub pattern: String,
41    /// Wasm runtime instance ID.
42    pub runtime_id: String,
43    /// Service name for logging.
44    pub service_name: String,
45}
46
47/// Callback invoked when a request matches a Wasm trigger.
48/// Receives (runtime_id, method, path, body) and returns the response body string.
49pub type WasmInvoker =
50    Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
51
52/// Future type returned by the Wasm invoker.
53pub type WasmInvokeFuture =
54    std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
55
56/// Shared Wasm trigger table type.
57pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
58
59/// Run the reverse proxy on the given port.
60///
61/// Routes by Host header to container backends, and by path pattern to Wasm components.
62///
63/// # Errors
64///
65/// Returns an error if the proxy fails to bind to the port.
66pub async fn run_proxy(
67    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
68    wasm_triggers: SharedWasmTriggers,
69    wasm_invoker: Option<WasmInvoker>,
70    port: u16,
71    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
72    acme_manager: Option<AcmeManager>,
73) -> anyhow::Result<()> {
74    let addr = format!("0.0.0.0:{port}");
75    let listener = TcpListener::bind(&addr).await?;
76    let proto = if tls_acceptor.is_some() {
77        "HTTPS"
78    } else {
79        "HTTP"
80    };
81    info!("Reverse proxy listening on {addr} ({proto})");
82
83    let counter = Arc::new(AtomicUsize::new(0));
84    let client = Arc::new(reqwest::Client::new());
85    let acme = acme_manager.map(Arc::new);
86
87    loop {
88        let (stream, peer) = match listener.accept().await {
89            Ok(conn) => conn,
90            Err(e) => {
91                warn!("Proxy accept error: {e}");
92                continue;
93            }
94        };
95
96        let routes = route_table.clone();
97        let triggers = wasm_triggers.clone();
98        let invoker = wasm_invoker.clone();
99        let counter = counter.clone();
100        let client = client.clone();
101        let acme = acme.clone();
102
103        let tls = tls_acceptor.clone();
104        tokio::spawn(async move {
105            let service = service_fn(move |req: Request<Incoming>| {
106                let routes = routes.clone();
107                let triggers = triggers.clone();
108                let invoker = invoker.clone();
109                let counter = counter.clone();
110                let client = client.clone();
111                let acme = acme.clone();
112                async move {
113                    // Intercept ACME challenge requests before normal routing
114                    if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
115                        return Ok(resp);
116                    }
117                    handle_request(req, &routes, &triggers, invoker.as_ref(), &counter, &client)
118                        .await
119                }
120            });
121
122            if let Some(acceptor) = tls {
123                match acceptor.accept(stream).await {
124                    Ok(tls_stream) => {
125                        let io = TokioIo::new(tls_stream);
126                        if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
127                            debug!("TLS proxy error from {peer}: {e}");
128                        }
129                    }
130                    Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
131                }
132            } else {
133                let io = TokioIo::new(stream);
134                if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
135                    debug!("Proxy connection error from {peer}: {e}");
136                }
137            }
138        });
139    }
140}