1mod logging;
2pub use logging::{logger, LogData, LogOption};
3pub mod onion;
4mod router;
5mod service;
6use crate::{
7 client::{
8 load_balancing::{self, LoadBalancer, Strategy},
9 worker::{WorkerType, Workers},
10 ExclusiveBody,
11 },
12 error::{FaucetError, FaucetResult},
13 leak,
14 shutdown::ShutdownSignal,
15 telemetry::{TelemetryManager, TelemetrySender},
16};
17use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request};
18use hyper_util::rt::TokioIo;
19use onion::{Service, ServiceBuilder};
20use service::{AddStateLayer, ProxyService};
21use std::{
22 ffi::{OsStr, OsString},
23 net::SocketAddr,
24 num::NonZeroUsize,
25 path::{Path, PathBuf},
26 pin::pin,
27 sync::Arc,
28};
29use tokio::net::TcpListener;
30
31pub use router::RouterConfig;
32
33use self::{logging::LogService, service::AddStateService};
34
35fn determine_strategy(server_type: WorkerType, strategy: Option<Strategy>) -> Strategy {
36 match server_type {
37 WorkerType::Plumber =>
38 strategy.unwrap_or_else(|| {
39 log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to round robin for plumber.");
40 Strategy::RoundRobin
41 }),
42 WorkerType::Shiny | WorkerType::QuartoShiny => match strategy {
43 None => {
44 log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to IP hash for shiny.");
45 Strategy::IpHash
46 },
47 Some(Strategy::CookieHash) => Strategy::CookieHash,
48 Some(Strategy::RoundRobin) => {
49 log::debug!(target: "faucet", "Round robin load balancing strategy specified for shiny, switching to IP hash.");
50 Strategy::IpHash
51 },
52 Some(Strategy::IpHash) => Strategy::IpHash,
53 }
54 }
55}
56
57pub struct FaucetServerBuilder {
58 strategy: Option<Strategy>,
59 bind: Option<SocketAddr>,
60 n_workers: Option<NonZeroUsize>,
61 server_type: Option<WorkerType>,
62 workdir: Option<PathBuf>,
63 extractor: Option<load_balancing::IpExtractor>,
64 rscript: Option<OsString>,
65 app_dir: Option<String>,
66 quarto: Option<OsString>,
67 qmd: Option<PathBuf>,
68 route: Option<String>,
69 telemetry: Option<TelemetrySender>,
70}
71
72impl FaucetServerBuilder {
73 pub fn new() -> Self {
74 FaucetServerBuilder {
75 strategy: None,
76 bind: None,
77 n_workers: None,
78 server_type: None,
79 workdir: None,
80 extractor: None,
81 rscript: None,
82 app_dir: None,
83 route: None,
84 quarto: None,
85 qmd: None,
86 telemetry: None,
87 }
88 }
89 pub fn app_dir(mut self, app_dir: Option<impl AsRef<str>>) -> Self {
90 self.app_dir = app_dir.map(|s| s.as_ref().into());
91 self
92 }
93 pub fn strategy(mut self, strategy: Option<Strategy>) -> Self {
94 log::debug!(target: "faucet", "Using load balancing strategy: {:?}", strategy);
95 self.strategy = strategy;
96 self
97 }
98 pub fn bind(mut self, bind: SocketAddr) -> Self {
99 log::debug!(target: "faucet", "Will bind to: {}", bind);
100 self.bind = Some(bind);
101 self
102 }
103 pub fn extractor(mut self, extractor: load_balancing::IpExtractor) -> Self {
104 log::debug!(target: "faucet", "Using IP extractor: {:?}", extractor);
105 self.extractor = Some(extractor);
106 self
107 }
108 pub fn workers(mut self, n: usize) -> Self {
109 log::debug!(target: "faucet", "Will spawn {} workers", n);
110 self.n_workers = match n.try_into() {
111 Ok(n) => Some(n),
112 Err(_) => {
113 log::error!(target: "faucet", "Number of workers must be greater than 0");
114 std::process::exit(1);
115 }
116 };
117 self
118 }
119 pub fn server_type(mut self, server_type: WorkerType) -> Self {
120 log::debug!(target: "faucet", "Using worker type: {:?}", server_type);
121 self.server_type = Some(server_type);
122 self
123 }
124 pub fn workdir(mut self, workdir: impl AsRef<Path>) -> Self {
125 log::debug!(target: "faucet", "Using workdir: {:?}", workdir.as_ref());
126 self.workdir = Some(workdir.as_ref().into());
127 self
128 }
129 pub fn rscript(mut self, rscript: impl AsRef<OsStr>) -> Self {
130 log::debug!(target: "faucet", "Using Rscript command: {:?}", rscript.as_ref());
131 self.rscript = Some(rscript.as_ref().into());
132 self
133 }
134 pub fn quarto(mut self, quarto: impl AsRef<OsStr>) -> Self {
135 log::debug!(target: "faucet", "Using quarto command: {:?}", quarto.as_ref());
136 self.quarto = Some(quarto.as_ref().into());
137 self
138 }
139 pub fn qmd(mut self, qmd: Option<impl AsRef<Path>>) -> Self {
140 self.qmd = qmd.map(|s| s.as_ref().into());
141 self
142 }
143 pub fn telemetry(mut self, telemetry_manager: Option<&TelemetryManager>) -> Self {
144 self.telemetry = telemetry_manager.map(|m| m.sender.clone());
145 self
146 }
147 pub fn route(mut self, route: String) -> Self {
148 self.route = Some(route);
149 self
150 }
151 pub fn build(self) -> FaucetResult<FaucetServerConfig> {
152 let server_type = self
153 .server_type
154 .ok_or(FaucetError::MissingArgument("server_type"))?;
155 let strategy = determine_strategy(server_type, self.strategy);
156 let bind = self.bind;
157 let n_workers = self.n_workers.unwrap_or_else(|| {
158 log::debug!(target: "faucet", "No number of workers specified. Defaulting to the number of logical cores.");
159 num_cpus::get().try_into().expect("num_cpus::get() returned 0")
160 });
161 let workdir = self.workdir
162 .map(|wd| leak!(wd, Path))
163 .unwrap_or_else(|| {
164 log::debug!(target: "faucet", "No workdir specified. Defaulting to the current directory.");
165 Path::new(".")
166 });
167 let rscript = self.rscript.map(|wd| leak!(wd, OsStr)).unwrap_or_else(|| {
168 log::debug!(target: "faucet", "No Rscript command specified. Defaulting to `Rscript`.");
169 OsStr::new("Rscript")
170 });
171 let extractor = self.extractor.unwrap_or_else(|| {
172 log::debug!(target: "faucet", "No IP extractor specified. Defaulting to client address.");
173 load_balancing::IpExtractor::ClientAddr
174 });
175 let app_dir = self.app_dir.map(|app_dir| leak!(app_dir, str));
176 let qmd = self.qmd.map(|qmd| leak!(qmd, Path));
177 let quarto = self.quarto.map(|qmd| leak!(qmd, OsStr)).unwrap_or_else(|| {
178 log::debug!(target: "faucet", "No quarto command specified. Defaulting to `quarto`.");
179 OsStr::new("quarto")
180 });
181 let telemetry = self.telemetry;
182 let route = self.route.map(|r| -> &'static _ { leak!(r) });
183 Ok(FaucetServerConfig {
184 strategy,
185 bind,
186 n_workers,
187 server_type,
188 workdir,
189 extractor,
190 rscript,
191 app_dir,
192 route,
193 quarto,
194 telemetry,
195 qmd,
196 })
197 }
198}
199
200impl Default for FaucetServerBuilder {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206#[derive(Clone)]
207pub struct FaucetServerConfig {
208 pub strategy: Strategy,
209 pub bind: Option<SocketAddr>,
210 pub n_workers: NonZeroUsize,
211 pub server_type: WorkerType,
212 pub workdir: &'static Path,
213 pub extractor: load_balancing::IpExtractor,
214 pub rscript: &'static OsStr,
215 pub quarto: &'static OsStr,
216 pub telemetry: Option<TelemetrySender>,
217 pub app_dir: Option<&'static str>,
218 pub route: Option<&'static str>,
219 pub qmd: Option<&'static Path>,
220}
221
222impl FaucetServerConfig {
223 pub async fn run(self, shutdown: ShutdownSignal) -> FaucetResult<()> {
224 let telemetry = self.telemetry.clone();
225 let mut workers = Workers::new(self.clone(), shutdown.clone()).await?;
226 let targets = workers.get_workers_config();
227 let load_balancer = LoadBalancer::new(self.strategy, self.extractor, &targets)?;
228 let bind = self.bind.ok_or(FaucetError::MissingArgument("bind"))?;
229
230 let load_balancer = load_balancer.clone();
231 let service = Arc::new(
232 ServiceBuilder::new(ProxyService)
233 .layer(logging::LogLayer { telemetry })
234 .layer(AddStateLayer::new(load_balancer))
235 .build(),
236 );
237
238 let listener = TcpListener::bind(bind).await?;
240 log::info!(target: "faucet", "Listening on http://{}", bind);
241 let main_loop = || async {
242 loop {
243 match listener.accept().await {
244 Err(e) => {
245 log::error!(target: "faucet", "Unable to accept TCP connection: {e}");
246 return;
247 }
248 Ok((tcp, client_addr)) => {
249 let tcp = TokioIo::new(tcp);
250 log::debug!(target: "faucet", "Accepted TCP connection from {}", client_addr);
251
252 let service = service.clone();
253 let shutdown = shutdown.clone();
254
255 tokio::task::spawn(async move {
256 let mut conn = http1::Builder::new()
257 .half_close(true)
258 .serve_connection(
259 tcp,
260 service_fn(|req: Request<Incoming>| {
261 service.call(req, Some(client_addr.ip()))
262 }),
263 )
264 .with_upgrades();
265
266 let conn = pin!(&mut conn);
267
268 tokio::select! {
269 result = conn => {
270 if let Err(e) = result {
271 log::error!(target: "faucet", "Connection error: {:?}", e);
272 }
273 }
274 _ = shutdown.wait() => ()
275 }
276 });
277 }
278 };
279 }
280 };
281
282 tokio::select! {
284 _ = shutdown.wait() => (),
285 _ = main_loop() => (),
286 }
287
288 for worker in &mut workers.workers {
289 worker.child.wait_until_done().await;
290 }
291
292 FaucetResult::Ok(())
293 }
294 pub async fn extract_service(
295 self,
296 shutdown: ShutdownSignal,
297 ) -> FaucetResult<(FaucetServerService, Workers)> {
298 let telemetry = self.telemetry.clone();
299 let workers = Workers::new(self.clone(), shutdown).await?;
300 let targets = workers.get_workers_config();
301 let load_balancer = LoadBalancer::new(self.strategy, self.extractor, &targets)?;
302 let service = Arc::new(
303 ServiceBuilder::new(ProxyService)
304 .layer(logging::LogLayer { telemetry })
305 .layer(AddStateLayer::new(load_balancer))
306 .build(),
307 );
308
309 Ok((FaucetServerService { inner: service }, workers))
310 }
311}
312
313pub struct FaucetServerService {
314 inner: Arc<AddStateService<LogService<ProxyService>>>,
315}
316
317impl Clone for FaucetServerService {
318 fn clone(&self) -> Self {
319 FaucetServerService {
320 inner: Arc::clone(&self.inner),
321 }
322 }
323}
324
325impl Service<hyper::Request<Incoming>> for FaucetServerService {
326 type Error = FaucetError;
327 type Response = hyper::Response<ExclusiveBody>;
328 async fn call(
329 &self,
330 req: hyper::Request<Incoming>,
331 ip_addr: Option<std::net::IpAddr>,
332 ) -> Result<Self::Response, Self::Error> {
333 self.inner.call(req, ip_addr).await
334 }
335}