1pub mod logging;
2pub use logging::{logger, HttpLogData, LogOption};
3use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
4pub mod onion;
5mod router;
6mod service;
7use crate::{
8 client::{
9 load_balancing::{self, LoadBalancer, Strategy},
10 worker::{WorkerConfigs, WorkerType},
11 ExclusiveBody,
12 },
13 error::{FaucetError, FaucetResult},
14 leak,
15 shutdown::ShutdownSignal,
16};
17use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request};
18use hyper_util::rt::TokioIo;
19use onion::{Service, ServiceBuilder};
20use service::{AddStateLayer, ProxyService};
21use std::{
22 ffi::{OsStr, OsString},
23 net::SocketAddr,
24 num::NonZeroUsize,
25 path::{Path, PathBuf},
26 pin::pin,
27 sync::Arc,
28};
29use tokio::net::TcpListener;
30
31pub use router::RouterConfig;
32
33use self::{logging::LogService, service::AddStateService};
34
35fn determine_strategy(server_type: WorkerType, strategy: Option<Strategy>) -> Strategy {
36 match server_type {
37 WorkerType::Plumber =>
38 strategy.unwrap_or_else(|| {
39 log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to round robin for plumber.");
40 Strategy::RoundRobin
41 }),
42 WorkerType::Shiny | WorkerType::QuartoShiny => match strategy {
43 None => {
44 log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to IP hash for shiny.");
45 Strategy::IpHash
46 },
47 Some(Strategy::Rps) => {
48 log::debug!(target: "faucet", "RPS load balancing strategy specified for shiny, switching to IP hash.");
49 Strategy::IpHash
50 },
51 Some(Strategy::CookieHash) => Strategy::CookieHash,
52 Some(Strategy::RoundRobin) => {
53 log::debug!(target: "faucet", "Round robin load balancing strategy specified for shiny, switching to IP hash.");
54 Strategy::IpHash
55 },
56 Some(Strategy::IpHash) => Strategy::IpHash,
57 },
58 #[cfg(test)]
59 WorkerType::Dummy => {
60 log::debug!(target: "faucet", "WorkerType is Dummy, defaulting strategy to RoundRobin for tests.");
61 Strategy::RoundRobin
62 }
63 }
64}
65
66pub struct FaucetServerBuilder {
67 strategy: Option<Strategy>,
68 bind: Option<SocketAddr>,
69 n_workers: Option<NonZeroUsize>,
70 server_type: Option<WorkerType>,
71 workdir: Option<PathBuf>,
72 extractor: Option<load_balancing::IpExtractor>,
73 rscript: Option<OsString>,
74 app_dir: Option<String>,
75 quarto: Option<OsString>,
76 qmd: Option<PathBuf>,
77 route: Option<String>,
78 max_rps: Option<f64>,
79}
80
81impl FaucetServerBuilder {
82 pub fn new() -> Self {
83 FaucetServerBuilder {
84 strategy: None,
85 bind: None,
86 n_workers: None,
87 server_type: None,
88 workdir: None,
89 extractor: None,
90 rscript: None,
91 app_dir: None,
92 route: None,
93 quarto: None,
94 qmd: None,
95 max_rps: None,
96 }
97 }
98 pub fn app_dir(mut self, app_dir: Option<impl AsRef<str>>) -> Self {
99 self.app_dir = app_dir.map(|s| s.as_ref().into());
100 self
101 }
102 pub fn strategy(mut self, strategy: Option<Strategy>) -> Self {
103 log::debug!(target: "faucet", "Using load balancing strategy: {strategy:?}");
104 self.strategy = strategy;
105 self
106 }
107 pub fn bind(mut self, bind: SocketAddr) -> Self {
108 log::debug!(target: "faucet", "Will bind to: {bind}");
109 self.bind = Some(bind);
110 self
111 }
112 pub fn extractor(mut self, extractor: load_balancing::IpExtractor) -> Self {
113 log::debug!(target: "faucet", "Using IP extractor: {extractor:?}");
114 self.extractor = Some(extractor);
115 self
116 }
117 pub fn workers(mut self, n: usize) -> Self {
118 log::debug!(target: "faucet", "Will spawn {n} workers");
119 self.n_workers = match n.try_into() {
120 Ok(n) => Some(n),
121 Err(_) => {
122 log::error!(target: "faucet", "Number of workers must be greater than 0");
123 std::process::exit(1);
124 }
125 };
126 self
127 }
128 pub fn server_type(mut self, server_type: WorkerType) -> Self {
129 log::debug!(target: "faucet", "Using worker type: {server_type:?}");
130 self.server_type = Some(server_type);
131 self
132 }
133 pub fn workdir(mut self, workdir: impl AsRef<Path>) -> Self {
134 log::debug!(target: "faucet", "Using workdir: {:?}", workdir.as_ref());
135 self.workdir = Some(workdir.as_ref().into());
136 self
137 }
138 pub fn rscript(mut self, rscript: impl AsRef<OsStr>) -> Self {
139 log::debug!(target: "faucet", "Using Rscript command: {:?}", rscript.as_ref());
140 self.rscript = Some(rscript.as_ref().into());
141 self
142 }
143 pub fn quarto(mut self, quarto: impl AsRef<OsStr>) -> Self {
144 log::debug!(target: "faucet", "Using quarto command: {:?}", quarto.as_ref());
145 self.quarto = Some(quarto.as_ref().into());
146 self
147 }
148 pub fn qmd(mut self, qmd: Option<impl AsRef<Path>>) -> Self {
149 self.qmd = qmd.map(|s| s.as_ref().into());
150 self
151 }
152 pub fn route(mut self, route: String) -> Self {
153 self.route = Some(route);
154 self
155 }
156 pub fn max_rps(mut self, max_rps: Option<f64>) -> Self {
157 self.max_rps = max_rps;
158 self
159 }
160 pub fn build(self) -> FaucetResult<FaucetServerConfig> {
161 let server_type = self
162 .server_type
163 .ok_or(FaucetError::MissingArgument("server_type"))?;
164 let strategy = determine_strategy(server_type, self.strategy);
165 let bind = self.bind;
166 let n_workers = self.n_workers.unwrap_or_else(|| {
167 log::debug!(target: "faucet", "No number of workers specified. Defaulting to the number of logical cores.");
168 num_cpus::get().try_into().expect("num_cpus::get() returned 0")
169 });
170 let workdir = self.workdir
171 .map(|wd| leak!(wd, Path))
172 .unwrap_or_else(|| {
173 log::debug!(target: "faucet", "No workdir specified. Defaulting to the current directory.");
174 Path::new(".")
175 });
176 let rscript = self.rscript.map(|wd| leak!(wd, OsStr)).unwrap_or_else(|| {
177 log::debug!(target: "faucet", "No Rscript command specified. Defaulting to `Rscript`.");
178 OsStr::new("Rscript")
179 });
180 let extractor = self.extractor.unwrap_or_else(|| {
181 log::debug!(target: "faucet", "No IP extractor specified. Defaulting to client address.");
182 load_balancing::IpExtractor::ClientAddr
183 });
184 let app_dir = self.app_dir.map(|app_dir| leak!(app_dir, str));
185 let qmd = self.qmd.map(|qmd| leak!(qmd, Path));
186 let quarto = self.quarto.map(|qmd| leak!(qmd, OsStr)).unwrap_or_else(|| {
187 log::debug!(target: "faucet", "No quarto command specified. Defaulting to `quarto`.");
188 OsStr::new("quarto")
189 });
190 let route = self.route.map(|r| -> &'static _ { leak!(r) });
191 let max_rps = self.max_rps;
192 Ok(FaucetServerConfig {
193 strategy,
194 bind,
195 n_workers,
196 server_type,
197 workdir,
198 extractor,
199 rscript,
200 app_dir,
201 route,
202 quarto,
203 qmd,
204 max_rps,
205 })
206 }
207}
208
209impl Default for FaucetServerBuilder {
210 fn default() -> Self {
211 Self::new()
212 }
213}
214
215#[derive(Clone)]
216pub struct FaucetServerConfig {
217 pub strategy: Strategy,
218 pub bind: Option<SocketAddr>,
219 pub n_workers: NonZeroUsize,
220 pub server_type: WorkerType,
221 pub workdir: &'static Path,
222 pub extractor: load_balancing::IpExtractor,
223 pub rscript: &'static OsStr,
224 pub quarto: &'static OsStr,
225 pub app_dir: Option<&'static str>,
226 pub route: Option<&'static str>,
227 pub qmd: Option<&'static Path>,
228 pub max_rps: Option<f64>,
229}
230
231impl FaucetServerConfig {
232 pub async fn run(
233 self,
234 shutdown: &'static ShutdownSignal,
235 websocket_config: &'static WebSocketConfig,
236 ) -> FaucetResult<()> {
237 let mut workers = WorkerConfigs::new(self.clone(), shutdown).await?;
238 let load_balancer = LoadBalancer::new(
239 self.strategy,
240 self.extractor,
241 &workers.workers,
242 self.max_rps,
243 )
244 .await?;
245 let bind = self.bind.ok_or(FaucetError::MissingArgument("bind"))?;
246
247 let load_balancer = load_balancer.clone();
248 let service = Arc::new(
249 ServiceBuilder::new(ProxyService {
250 shutdown,
251 websocket_config,
252 })
253 .layer(logging::LogLayer {})
254 .layer(AddStateLayer::new(load_balancer))
255 .build(),
256 );
257
258 let listener = TcpListener::bind(bind).await?;
260 log::info!(target: "faucet", "Listening on http://{bind}");
261 let main_loop = || async {
262 loop {
263 match listener.accept().await {
264 Err(e) => {
265 log::error!(target: "faucet", "Unable to accept TCP connection: {e}");
266 return;
267 }
268 Ok((tcp, client_addr)) => {
269 let tcp = TokioIo::new(tcp);
270 log::debug!(target: "faucet", "Accepted TCP connection from {client_addr}");
271
272 let service = service.clone();
273
274 tokio::task::spawn(async move {
275 let mut conn = http1::Builder::new()
276 .half_close(true)
277 .serve_connection(
278 tcp,
279 service_fn(|req: Request<Incoming>| {
280 service.call(req, Some(client_addr.ip()))
281 }),
282 )
283 .with_upgrades();
284
285 let conn = pin!(&mut conn);
286
287 tokio::select! {
288 result = conn => {
289 if let Err(e) = result {
290 log::error!(target: "faucet", "Connection error: {e:?}");
291 }
292 }
293 _ = shutdown.wait() => ()
294 }
295 });
296 }
297 };
298 }
299 };
300
301 tokio::select! {
303 _ = shutdown.wait() => (),
304 _ = main_loop() => (),
305 }
306
307 log::debug!("Main loop ended!");
308
309 for worker in &mut workers.workers {
310 log::debug!("Waiting for {} to finish", worker.target);
311 worker.wait_until_done().await;
312 }
313
314 log::debug!("All workers are finished!");
315
316 FaucetResult::Ok(())
317 }
318 pub async fn extract_service(
319 self,
320 shutdown: &'static ShutdownSignal,
321 websocket_config: &'static WebSocketConfig,
322 ) -> FaucetResult<(FaucetServerService, WorkerConfigs)> {
323 let workers = WorkerConfigs::new(self.clone(), shutdown).await?;
324 let load_balancer = LoadBalancer::new(
325 self.strategy,
326 self.extractor,
327 &workers.workers,
328 self.max_rps,
329 )
330 .await?;
331 let service = Arc::new(
332 ServiceBuilder::new(ProxyService {
333 shutdown,
334 websocket_config,
335 })
336 .layer(logging::LogLayer {})
337 .layer(AddStateLayer::new(load_balancer))
338 .build(),
339 );
340
341 Ok((FaucetServerService { inner: service }, workers))
342 }
343}
344
345pub struct FaucetServerService {
346 inner: Arc<AddStateService<LogService<ProxyService>>>,
347}
348
349impl Clone for FaucetServerService {
350 fn clone(&self) -> Self {
351 FaucetServerService {
352 inner: Arc::clone(&self.inner),
353 }
354 }
355}
356
357impl Service<hyper::Request<Incoming>> for FaucetServerService {
358 type Error = FaucetError;
359 type Response = hyper::Response<ExclusiveBody>;
360 async fn call(
361 &self,
362 req: hyper::Request<Incoming>,
363 ip_addr: Option<std::net::IpAddr>,
364 ) -> Result<Self::Response, Self::Error> {
365 self.inner.call(req, ip_addr).await
366 }
367}