1pub mod logging;
2pub use logging::{logger, HttpLogData, LogOption};
3pub mod onion;
4mod router;
5mod service;
6use crate::{
7 client::{
8 load_balancing::{self, LoadBalancer, Strategy},
9 worker::{WorkerConfigs, WorkerType},
10 ExclusiveBody,
11 },
12 error::{FaucetError, FaucetResult},
13 leak,
14 shutdown::ShutdownSignal,
15};
16use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request};
17use hyper_util::rt::TokioIo;
18use onion::{Service, ServiceBuilder};
19use service::{AddStateLayer, ProxyService};
20use std::{
21 ffi::{OsStr, OsString},
22 net::SocketAddr,
23 num::NonZeroUsize,
24 path::{Path, PathBuf},
25 pin::pin,
26 sync::Arc,
27};
28use tokio::net::TcpListener;
29
30pub use router::RouterConfig;
31
32use self::{logging::LogService, service::AddStateService};
33
34fn determine_strategy(server_type: WorkerType, strategy: Option<Strategy>) -> Strategy {
35 match server_type {
36 WorkerType::Plumber =>
37 strategy.unwrap_or_else(|| {
38 log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to round robin for plumber.");
39 Strategy::RoundRobin
40 }),
41 WorkerType::Shiny | WorkerType::QuartoShiny => match strategy {
42 None => {
43 log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to IP hash for shiny.");
44 Strategy::IpHash
45 },
46 Some(Strategy::Rps) => {
47 log::debug!(target: "faucet", "RPS load balancing strategy specified for shiny, switching to IP hash.");
48 Strategy::IpHash
49 },
50 Some(Strategy::CookieHash) => Strategy::CookieHash,
51 Some(Strategy::RoundRobin) => {
52 log::debug!(target: "faucet", "Round robin load balancing strategy specified for shiny, switching to IP hash.");
53 Strategy::IpHash
54 },
55 Some(Strategy::IpHash) => Strategy::IpHash,
56 },
57 #[cfg(test)]
58 WorkerType::Dummy => {
59 log::debug!(target: "faucet", "WorkerType is Dummy, defaulting strategy to RoundRobin for tests.");
60 Strategy::RoundRobin
61 }
62 }
63}
64
65pub struct FaucetServerBuilder {
66 strategy: Option<Strategy>,
67 bind: Option<SocketAddr>,
68 n_workers: Option<NonZeroUsize>,
69 server_type: Option<WorkerType>,
70 workdir: Option<PathBuf>,
71 extractor: Option<load_balancing::IpExtractor>,
72 rscript: Option<OsString>,
73 app_dir: Option<String>,
74 quarto: Option<OsString>,
75 qmd: Option<PathBuf>,
76 route: Option<String>,
77 max_rps: Option<f64>,
78}
79
80impl FaucetServerBuilder {
81 pub fn new() -> Self {
82 FaucetServerBuilder {
83 strategy: None,
84 bind: None,
85 n_workers: None,
86 server_type: None,
87 workdir: None,
88 extractor: None,
89 rscript: None,
90 app_dir: None,
91 route: None,
92 quarto: None,
93 qmd: None,
94 max_rps: None,
95 }
96 }
97 pub fn app_dir(mut self, app_dir: Option<impl AsRef<str>>) -> Self {
98 self.app_dir = app_dir.map(|s| s.as_ref().into());
99 self
100 }
101 pub fn strategy(mut self, strategy: Option<Strategy>) -> Self {
102 log::debug!(target: "faucet", "Using load balancing strategy: {strategy:?}");
103 self.strategy = strategy;
104 self
105 }
106 pub fn bind(mut self, bind: SocketAddr) -> Self {
107 log::debug!(target: "faucet", "Will bind to: {bind}");
108 self.bind = Some(bind);
109 self
110 }
111 pub fn extractor(mut self, extractor: load_balancing::IpExtractor) -> Self {
112 log::debug!(target: "faucet", "Using IP extractor: {extractor:?}");
113 self.extractor = Some(extractor);
114 self
115 }
116 pub fn workers(mut self, n: usize) -> Self {
117 log::debug!(target: "faucet", "Will spawn {n} workers");
118 self.n_workers = match n.try_into() {
119 Ok(n) => Some(n),
120 Err(_) => {
121 log::error!(target: "faucet", "Number of workers must be greater than 0");
122 std::process::exit(1);
123 }
124 };
125 self
126 }
127 pub fn server_type(mut self, server_type: WorkerType) -> Self {
128 log::debug!(target: "faucet", "Using worker type: {server_type:?}");
129 self.server_type = Some(server_type);
130 self
131 }
132 pub fn workdir(mut self, workdir: impl AsRef<Path>) -> Self {
133 log::debug!(target: "faucet", "Using workdir: {:?}", workdir.as_ref());
134 self.workdir = Some(workdir.as_ref().into());
135 self
136 }
137 pub fn rscript(mut self, rscript: impl AsRef<OsStr>) -> Self {
138 log::debug!(target: "faucet", "Using Rscript command: {:?}", rscript.as_ref());
139 self.rscript = Some(rscript.as_ref().into());
140 self
141 }
142 pub fn quarto(mut self, quarto: impl AsRef<OsStr>) -> Self {
143 log::debug!(target: "faucet", "Using quarto command: {:?}", quarto.as_ref());
144 self.quarto = Some(quarto.as_ref().into());
145 self
146 }
147 pub fn qmd(mut self, qmd: Option<impl AsRef<Path>>) -> Self {
148 self.qmd = qmd.map(|s| s.as_ref().into());
149 self
150 }
151 pub fn route(mut self, route: String) -> Self {
152 self.route = Some(route);
153 self
154 }
155 pub fn max_rps(mut self, max_rps: Option<f64>) -> Self {
156 self.max_rps = max_rps;
157 self
158 }
159 pub fn build(self) -> FaucetResult<FaucetServerConfig> {
160 let server_type = self
161 .server_type
162 .ok_or(FaucetError::MissingArgument("server_type"))?;
163 let strategy = determine_strategy(server_type, self.strategy);
164 let bind = self.bind;
165 let n_workers = self.n_workers.unwrap_or_else(|| {
166 log::debug!(target: "faucet", "No number of workers specified. Defaulting to the number of logical cores.");
167 num_cpus::get().try_into().expect("num_cpus::get() returned 0")
168 });
169 let workdir = self.workdir
170 .map(|wd| leak!(wd, Path))
171 .unwrap_or_else(|| {
172 log::debug!(target: "faucet", "No workdir specified. Defaulting to the current directory.");
173 Path::new(".")
174 });
175 let rscript = self.rscript.map(|wd| leak!(wd, OsStr)).unwrap_or_else(|| {
176 log::debug!(target: "faucet", "No Rscript command specified. Defaulting to `Rscript`.");
177 OsStr::new("Rscript")
178 });
179 let extractor = self.extractor.unwrap_or_else(|| {
180 log::debug!(target: "faucet", "No IP extractor specified. Defaulting to client address.");
181 load_balancing::IpExtractor::ClientAddr
182 });
183 let app_dir = self.app_dir.map(|app_dir| leak!(app_dir, str));
184 let qmd = self.qmd.map(|qmd| leak!(qmd, Path));
185 let quarto = self.quarto.map(|qmd| leak!(qmd, OsStr)).unwrap_or_else(|| {
186 log::debug!(target: "faucet", "No quarto command specified. Defaulting to `quarto`.");
187 OsStr::new("quarto")
188 });
189 let route = self.route.map(|r| -> &'static _ { leak!(r) });
190 let max_rps = self.max_rps;
191 Ok(FaucetServerConfig {
192 strategy,
193 bind,
194 n_workers,
195 server_type,
196 workdir,
197 extractor,
198 rscript,
199 app_dir,
200 route,
201 quarto,
202 qmd,
203 max_rps,
204 })
205 }
206}
207
208impl Default for FaucetServerBuilder {
209 fn default() -> Self {
210 Self::new()
211 }
212}
213
214#[derive(Clone)]
215pub struct FaucetServerConfig {
216 pub strategy: Strategy,
217 pub bind: Option<SocketAddr>,
218 pub n_workers: NonZeroUsize,
219 pub server_type: WorkerType,
220 pub workdir: &'static Path,
221 pub extractor: load_balancing::IpExtractor,
222 pub rscript: &'static OsStr,
223 pub quarto: &'static OsStr,
224 pub app_dir: Option<&'static str>,
225 pub route: Option<&'static str>,
226 pub qmd: Option<&'static Path>,
227 pub max_rps: Option<f64>,
228}
229
230impl FaucetServerConfig {
231 pub async fn run(self, shutdown: &'static ShutdownSignal) -> FaucetResult<()> {
232 let mut workers = WorkerConfigs::new(self.clone(), shutdown).await?;
233 let load_balancer = LoadBalancer::new(
234 self.strategy,
235 self.extractor,
236 &workers.workers,
237 self.max_rps,
238 )
239 .await?;
240 let bind = self.bind.ok_or(FaucetError::MissingArgument("bind"))?;
241
242 let load_balancer = load_balancer.clone();
243 let service = Arc::new(
244 ServiceBuilder::new(ProxyService { shutdown })
245 .layer(logging::LogLayer {})
246 .layer(AddStateLayer::new(load_balancer))
247 .build(),
248 );
249
250 let listener = TcpListener::bind(bind).await?;
252 log::info!(target: "faucet", "Listening on http://{bind}");
253 let main_loop = || async {
254 loop {
255 match listener.accept().await {
256 Err(e) => {
257 log::error!(target: "faucet", "Unable to accept TCP connection: {e}");
258 return;
259 }
260 Ok((tcp, client_addr)) => {
261 let tcp = TokioIo::new(tcp);
262 log::debug!(target: "faucet", "Accepted TCP connection from {client_addr}");
263
264 let service = service.clone();
265
266 tokio::task::spawn(async move {
267 let mut conn = http1::Builder::new()
268 .half_close(true)
269 .serve_connection(
270 tcp,
271 service_fn(|req: Request<Incoming>| {
272 service.call(req, Some(client_addr.ip()))
273 }),
274 )
275 .with_upgrades();
276
277 let conn = pin!(&mut conn);
278
279 tokio::select! {
280 result = conn => {
281 if let Err(e) = result {
282 log::error!(target: "faucet", "Connection error: {e:?}");
283 }
284 }
285 _ = shutdown.wait() => ()
286 }
287 });
288 }
289 };
290 }
291 };
292
293 tokio::select! {
295 _ = shutdown.wait() => (),
296 _ = main_loop() => (),
297 }
298
299 log::debug!("Main loop ended!");
300
301 for worker in &mut workers.workers {
302 log::debug!("Waiting for {} to finish", worker.target);
303 worker.wait_until_done().await;
304 }
305
306 log::debug!("All workers are finished!");
307
308 FaucetResult::Ok(())
309 }
310 pub async fn extract_service(
311 self,
312 shutdown: &'static ShutdownSignal,
313 ) -> FaucetResult<(FaucetServerService, WorkerConfigs)> {
314 let workers = WorkerConfigs::new(self.clone(), shutdown).await?;
315 let load_balancer = LoadBalancer::new(
316 self.strategy,
317 self.extractor,
318 &workers.workers,
319 self.max_rps,
320 )
321 .await?;
322 let service = Arc::new(
323 ServiceBuilder::new(ProxyService { shutdown })
324 .layer(logging::LogLayer {})
325 .layer(AddStateLayer::new(load_balancer))
326 .build(),
327 );
328
329 Ok((FaucetServerService { inner: service }, workers))
330 }
331}
332
333pub struct FaucetServerService {
334 inner: Arc<AddStateService<LogService<ProxyService>>>,
335}
336
337impl Clone for FaucetServerService {
338 fn clone(&self) -> Self {
339 FaucetServerService {
340 inner: Arc::clone(&self.inner),
341 }
342 }
343}
344
345impl Service<hyper::Request<Incoming>> for FaucetServerService {
346 type Error = FaucetError;
347 type Response = hyper::Response<ExclusiveBody>;
348 async fn call(
349 &self,
350 req: hyper::Request<Incoming>,
351 ip_addr: Option<std::net::IpAddr>,
352 ) -> Result<Self::Response, Self::Error> {
353 self.inner.call(req, ip_addr).await
354 }
355}