1mod handler;
7mod routing;
8
9use std::collections::HashMap;
10use std::sync::Arc;
11use std::sync::atomic::AtomicUsize;
12
13use hyper::Request;
14use hyper::body::Incoming;
15use hyper::server::conn::http1;
16use hyper::service::service_fn;
17use hyper_util::rt::TokioIo;
18use tokio::net::TcpListener;
19use tokio::sync::RwLock;
20use tracing::{debug, info, warn};
21
22use handler::handle_request;
23
24#[derive(Debug, Clone)]
26pub struct RouteTarget {
27 pub address: String,
29 pub service_name: String,
31}
32
33#[derive(Debug, Clone)]
35pub struct WasmTrigger {
36 pub pattern: String,
38 pub runtime_id: String,
40 pub service_name: String,
42}
43
44pub type WasmInvoker =
47 Arc<dyn Fn(String, String, String, String) -> WasmInvokeFuture + Send + Sync>;
48
49pub type WasmInvokeFuture =
51 std::pin::Pin<Box<dyn std::future::Future<Output = Result<String, String>> + Send>>;
52
53pub type SharedWasmTriggers = Arc<RwLock<Vec<WasmTrigger>>>;
55
56pub async fn run_proxy(
64 route_table: Arc<RwLock<HashMap<String, Vec<RouteTarget>>>>,
65 wasm_triggers: SharedWasmTriggers,
66 wasm_invoker: Option<WasmInvoker>,
67 port: u16,
68) -> anyhow::Result<()> {
69 let addr = format!("0.0.0.0:{port}");
70 let listener = TcpListener::bind(&addr).await?;
71 info!("Reverse proxy listening on {addr}");
72
73 let counter = Arc::new(AtomicUsize::new(0));
74 let client = Arc::new(reqwest::Client::new());
75
76 loop {
77 let (stream, peer) = match listener.accept().await {
78 Ok(conn) => conn,
79 Err(e) => {
80 warn!("Proxy accept error: {e}");
81 continue;
82 }
83 };
84
85 let routes = route_table.clone();
86 let triggers = wasm_triggers.clone();
87 let invoker = wasm_invoker.clone();
88 let counter = counter.clone();
89 let client = client.clone();
90
91 tokio::spawn(async move {
92 let io = TokioIo::new(stream);
93 let service = service_fn(move |req: Request<Incoming>| {
94 let routes = routes.clone();
95 let triggers = triggers.clone();
96 let invoker = invoker.clone();
97 let counter = counter.clone();
98 let client = client.clone();
99 async move {
100 handle_request(req, &routes, &triggers, invoker.as_ref(), &counter, &client)
101 .await
102 }
103 });
104
105 if let Err(e) = http1::Builder::new().serve_connection(io, service).await {
106 debug!("Proxy connection error from {peer}: {e}");
107 }
108 });
109 }
110}