Skip to main content

orca_proxy/
lib.rs

1//! Reverse proxy with HTTP routing for containers and Wasm trigger dispatch.
2//!
3//! Routes HTTP traffic by `Host` header to container backends (round-robin),
4//! and by path pattern to Wasm component invocations via a callback.
5
6mod handler;
7mod routing;
8
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12
13use hyper::Request;
14use hyper::body::Incoming;
15use hyper::server::conn::http1;
16use hyper::service::service_fn;
17use hyper_util::rt::TokioIo;
18use tokio::net::TcpListener;
19use tokio::sync::RwLock;
20use tracing::{debug, info, warn};
21
22use handler::handle_request;
23
24/// A backend target for container routing.
25#[derive(Debug, Clone)]
26pub struct RouteTarget {
27    /// Address in the form `ip:port`.
28    pub address: String,
29    /// Owning service name.
30    pub service_name: String,
31}
32
33/// A Wasm HTTP trigger: maps a path pattern to a Wasm runtime instance.
34#[derive(Debug, Clone)]
35pub struct WasmTrigger {
36    /// Path pattern (e.g., "/api/edge/*").
37    pub pattern: String,
38    /// Wasm runtime instance ID.
39    pub runtime_id: String,
40    /// Service name for logging.
41    pub service_name: String,
42}
43
44/// Callback invoked when a request matches a Wasm trigger.
45/// Receives (runtime_id, method, path, body) and returns the response body string.
46pub type WasmInvoker =
47    Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
48
49/// Future type returned by the Wasm invoker.
50pub type WasmInvokeFuture =
51    std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
52
53/// Shared Wasm trigger table type.
54pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
55
56/// Run the reverse proxy on the given port.
57///
58/// Routes by Host header to container backends, and by path pattern to Wasm components.
59///
60/// # Errors
61///
62/// Returns an error if the proxy fails to bind to the port.
63pub async fn run_proxy(
64    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
65    wasm_triggers: SharedWasmTriggers,
66    wasm_invoker: Option<WasmInvoker>,
67    port: u16,
68) -> anyhow::Result<()> {
69    let addr = format!("0.0.0.0:{port}");
70    let listener = TcpListener::bind(&addr).await?;
71    info!("Reverse proxy listening on {addr}");
72
73    let counter = Arc::new(AtomicUsize::new(0));
74    let client = Arc::new(reqwest::Client::new());
75
76    loop {
77        let (stream, peer) = match listener.accept().await {
78            Ok(conn) => conn,
79            Err(e) => {
80                warn!("Proxy accept error: {e}");
81                continue;
82            }
83        };
84
85        let routes = route_table.clone();
86        let triggers = wasm_triggers.clone();
87        let invoker = wasm_invoker.clone();
88        let counter = counter.clone();
89        let client = client.clone();
90
91        tokio::spawn(async move {
92            let io = TokioIo::new(stream);
93            let service = service_fn(move |req: Request<Incoming>| {
94                let routes = routes.clone();
95                let triggers = triggers.clone();
96                let invoker = invoker.clone();
97                let counter = counter.clone();
98                let client = client.clone();
99                async move {
100                    handle_request(req, &routes, &triggers, invoker.as_ref(), &counter, &client)
101                        .await
102                }
103            });
104
105            if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
106                debug!("Proxy connection error from {peer}: {e}");
107            }
108        });
109    }
110}