orca-proxy 0.2.8

Reverse proxy with HTTP routing and Wasm trigger dispatch
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
//! Reverse proxy with HTTP routing for containers and Wasm trigger dispatch.
//!
//! Routes HTTP traffic by `Host` header to container backends (round-robin),
//! and by path pattern to Wasm component invocations via a callback.
//! Supports automatic TLS via ACME/Let's Encrypt (Caddy-style zero-config).

pub mod acme;
mod forward;
mod handler;
pub mod rate_limit;
mod routing;
pub mod sni;
pub mod tls;
mod websocket;

pub use orca_core::config::FallbackConfig;

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;

use hyper::Request;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
use tracing::{debug, error, info, warn};

use acme::AcmeManager;
use handler::{handle_acme_challenge, handle_request};
use rate_limit::RateLimiter;

/// A backend target for container routing.
#[derive(Debug, Clone)]
pub struct RouteTarget {
    /// Address in the form `ip:port`.
    pub address: String,
    /// Owning service name.
    pub service_name: String,
    /// Optional path pattern (e.g., `"/api/*"`). When `None`, this target is a
    /// catch-all for the domain. When `Some`, only requests whose path matches
    /// the pattern are routed here. Longest-prefix match wins.
    pub path_pattern: Option<String>,
    /// Traffic weight (1-100, default 100). Used for weighted routing
    /// during canary deployments. Higher weight = more traffic.
    pub weight: u32,
    /// Prefix to strip from the request path before forwarding upstream,
    /// e.g. `"/admin"`. With `path_pattern = "/admin/*"` and
    /// `strip_prefix = Some("/admin")`, a request for `/admin/users` is
    /// forwarded as `/users` — same semantics as Caddy's `handle_path`.
    pub strip_prefix: Option<String>,
}

/// A Wasm HTTP trigger: maps a path pattern to a Wasm runtime instance.
#[derive(Debug, Clone)]
pub struct WasmTrigger {
    /// Path pattern (e.g., "/api/edge/*").
    pub pattern: String,
    /// Wasm runtime instance ID.
    pub runtime_id: String,
    /// Service name for logging.
    pub service_name: String,
}

/// Callback invoked when a request matches a Wasm trigger.
/// Receives (runtime_id, method, path, body) and returns the response body string.
pub type WasmInvoker =
    Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;

/// Future type returned by the Wasm invoker.
pub type WasmInvokeFuture =
    std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;

/// Shared Wasm trigger table type.
pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;

/// Run the reverse proxy on the given port.
pub async fn run_proxy(
    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
    wasm_triggers: SharedWasmTriggers,
    wasm_invoker: Option<WasmInvoker>,
    port: u16,
    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
    acme_manager: Option<AcmeManager>,
) -> anyhow::Result<()> {
    let addr = format!("0.0.0.0:{port}");
    let listener = TcpListener::bind(&addr).await?;
    let proto = if tls_acceptor.is_some() {
        "HTTPS"
    } else {
        "HTTP"
    };
    info!("Reverse proxy listening on {addr} ({proto})");

    serve_loop(
        listener,
        route_table,
        wasm_triggers,
        wasm_invoker,
        tls_acceptor,
        acme_manager,
    )
    .await
}

/// Run the proxy with optional fallback support.
#[allow(clippy::too_many_arguments)]
pub async fn run_proxy_with_fallback(
    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
    wasm_triggers: SharedWasmTriggers,
    wasm_invoker: Option<WasmInvoker>,
    port: u16,
    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
    acme_manager: Option<AcmeManager>,
    fallback: Option<FallbackConfig>,
) -> anyhow::Result<()> {
    let addr = format!("0.0.0.0:{port}");
    let listener = TcpListener::bind(&addr).await?;
    let proto = if tls_acceptor.is_some() {
        "HTTPS"
    } else {
        "HTTP"
    };
    info!("Reverse proxy listening on {addr} ({proto})");

    serve_loop_with_fallback(
        listener,
        route_table,
        wasm_triggers,
        wasm_invoker,
        tls_acceptor,
        acme_manager,
        fallback,
    )
    .await
}

/// Shared dynamic cert resolver for hot-provisioning.
pub type SharedCertResolver = Arc<acme::DynCertResolver>;

/// Run HTTP on port 80 (for ACME challenges + redirect) and HTTPS on port 443.
///
/// Automatically provisions certs for all given domains via Let's Encrypt.
/// Returns a `SharedCertResolver` that can be used to hot-provision certs
/// for new domains added later via `orca deploy`.
pub async fn run_proxy_with_acme(
    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
    wasm_triggers: SharedWasmTriggers,
    wasm_invoker: Option<WasmInvoker>,
    acme_manager: AcmeManager,
    domains: Vec<String>,
) -> anyhow::Result<SharedCertResolver> {
    run_proxy_with_acme_and_fallback(
        route_table,
        wasm_triggers,
        wasm_invoker,
        acme_manager,
        domains,
        None,
    )
    .await
}

/// Run HTTP+HTTPS with ACME and optional fallback to another reverse proxy.
#[allow(clippy::too_many_arguments)]
pub async fn run_proxy_with_acme_and_fallback(
    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
    wasm_triggers: SharedWasmTriggers,
    wasm_invoker: Option<WasmInvoker>,
    acme_manager: AcmeManager,
    domains: Vec<String>,
    fallback: Option<FallbackConfig>,
) -> anyhow::Result<SharedCertResolver> {
    let resolver = Arc::new(acme::DynCertResolver::new());

    let acme_mgr = acme_manager.clone();
    let routes_clone = route_table.clone();
    let triggers_clone = wasm_triggers.clone();
    let invoker_clone = wasm_invoker.clone();
    let fallback_http = fallback.clone();
    let fallback_tls = fallback.clone();

    // Start HTTP on port 80 first (needed for ACME challenge validation)
    let http_handle = tokio::spawn({
        let acme = acme_mgr.clone();
        let routes = routes_clone.clone();
        let triggers = triggers_clone.clone();
        let invoker = invoker_clone.clone();
        async move {
            if let Err(e) = run_proxy_with_fallback(
                routes,
                triggers,
                invoker,
                80,
                None,
                Some(acme),
                fallback_http,
            )
            .await
            {
                error!("HTTP listener failed: {e}");
            }
        }
    });

    // Provision certs for initial domains, then start HTTPS with SNI resolver
    let resolver_clone = resolver.clone();
    let https_handle = tokio::spawn(async move {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;

        // Provision all initial domain certs
        for domain in &domains {
            if let Err(e) = acme_mgr
                .ensure_cert_for_resolver(domain, &resolver_clone)
                .await
            {
                error!(domain = %domain, error = %e, "Failed to provision cert");
            }
        }

        // Build TlsAcceptor with SNI resolver for multi-domain support
        let config = rustls::ServerConfig::builder()
            .with_no_client_auth()
            .with_cert_resolver(resolver_clone);

        let acceptor = tokio_rustls::TlsAcceptor::from(Arc::new(config));
        info!(
            "Starting HTTPS with SNI resolver ({} domains)",
            domains.len()
        );

        let routes = routes_clone;
        let triggers = triggers_clone;
        let invoker = invoker_clone;
        if let Err(e) = run_proxy_with_fallback(
            routes,
            triggers,
            invoker,
            443,
            Some(acceptor),
            Some(acme_mgr),
            fallback_tls,
        )
        .await
        {
            error!("HTTPS listener failed: {e}");
        }
    });

    // Don't block — return the resolver so the control plane can hot-add certs.
    // The HTTP and HTTPS listeners run in the background.
    tokio::spawn(async move {
        tokio::select! {
            _ = http_handle => warn!("HTTP listener exited"),
            _ = https_handle => warn!("HTTPS listener exited"),
        }
    });

    Ok(resolver)
}

/// Core accept loop shared by HTTP and HTTPS listeners.
async fn serve_loop(
    listener: TcpListener,
    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
    wasm_triggers: SharedWasmTriggers,
    wasm_invoker: Option<WasmInvoker>,
    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
    acme_manager: Option<AcmeManager>,
) -> anyhow::Result<()> {
    serve_loop_with_fallback(
        listener,
        route_table,
        wasm_triggers,
        wasm_invoker,
        tls_acceptor,
        acme_manager,
        None,
    )
    .await
}

/// Serve loop variant with fallback support for SNI passthrough and HTTP forwarding.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn serve_loop_with_fallback(
    listener: TcpListener,
    route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
    wasm_triggers: SharedWasmTriggers,
    wasm_invoker: Option<WasmInvoker>,
    tls_acceptor: Option<tokio_rustls::TlsAcceptor>,
    acme_manager: Option<AcmeManager>,
    fallback: Option<FallbackConfig>,
) -> anyhow::Result<()> {
    let counter = Arc::new(AtomicUsize::new(0));
    let client = Arc::new(
        reqwest::Client::builder()
            .no_proxy()
            .redirect(reqwest::redirect::Policy::none())
            // Timeouts are mandatory: without them, a hung upstream (slow
            // backend, dead fallback, slowloris) parks the per-request task
            // forever. We accept many in-flight tasks but every one must
            // eventually complete so the proxy can recover without a restart.
            .connect_timeout(std::time::Duration::from_secs(10))
            .timeout(std::time::Duration::from_secs(300))
            .pool_idle_timeout(std::time::Duration::from_secs(90))
            .build()
            .expect("failed to build HTTP client"),
    );
    let acme = acme_manager.map(Arc::new);
    let is_tls = tls_acceptor.is_some();
    let rate_limiter = RateLimiter::new();

    let fallback = Arc::new(fallback);
    loop {
        let (stream, peer) = match listener.accept().await {
            Ok(conn) => conn,
            Err(e) => {
                warn!("Proxy accept error: {e}");
                continue;
            }
        };

        let routes = route_table.clone();
        let triggers = wasm_triggers.clone();
        let invoker = wasm_invoker.clone();
        let counter = counter.clone();
        let client = client.clone();
        let acme = acme.clone();
        let tls = tls_acceptor.clone();
        let rl = rate_limiter.clone();
        let fb = fallback.clone();
        let routes_for_sni = routes.clone();

        let fb_for_service = fb.clone();
        tokio::spawn(async move {
            let service = service_fn(move |req: Request<Incoming>| {
                let routes = routes.clone();
                let triggers = triggers.clone();
                let invoker = invoker.clone();
                let counter = counter.clone();
                let client = client.clone();
                let acme = acme.clone();
                let rl = rl.clone();
                let fb = fb_for_service.clone();
                async move {
                    if let Some(resp) = handle_acme_challenge(&req, acme.as_deref()).await {
                        return Ok(resp);
                    }
                    handle_request(
                        req,
                        &routes,
                        &triggers,
                        invoker.as_ref(),
                        &counter,
                        &client,
                        is_tls,
                        &rl,
                        peer,
                        fb.as_ref().as_ref(),
                    )
                    .await
                }
            });
            if let Some(acceptor) = tls {
                let mut stream = stream;
                // Peek SNI to decide between local TLS termination and pass-through
                let sni = sni::peek_sni(&mut stream).await;
                let should_passthrough = if let Some(ref host) = sni {
                    let routes_lock = routes_for_sni.read().await;
                    let known = routes_lock.contains_key(host);
                    drop(routes_lock);
                    !known && fb.as_ref().as_ref().and_then(|f| f.tls.as_ref()).is_some()
                } else {
                    false
                };

                if should_passthrough {
                    let target = fb
                        .as_ref()
                        .as_ref()
                        .and_then(|f| f.tls.clone())
                        .expect("checked above");
                    debug!(?sni, %target, "SNI passthrough");
                    match tokio::net::TcpStream::connect(&target).await {
                        Ok(mut backend) => {
                            if let Err(e) =
                                tokio::io::copy_bidirectional(&mut stream, &mut backend).await
                            {
                                debug!("Passthrough copy error from {peer}: {e}");
                            }
                        }
                        Err(e) => warn!("Failed to connect to TLS fallback {target}: {e}"),
                    }
                    return;
                }

                match acceptor.accept(stream).await {
                    Ok(tls_stream) => {
                        let io = TokioIo::new(tls_stream);
                        if let Err(e) = http1::Builder::new()
                            .serve_connection(io, service)
                            .with_upgrades()
                            .await
                        {
                            debug!("TLS proxy error from {peer}: {e}");
                        }
                    }
                    Err(e) => debug!("TLS handshake failed from {peer}: {e}"),
                }
            } else {
                let io = TokioIo::new(stream);
                if let Err(e) = http1::Builder::new()
                    .serve_connection(io, service)
                    .with_upgrades()
                    .await
                {
                    debug!("Proxy connection error from {peer}: {e}");
                }
            }
        });
    }
}