faucet_server/server/
mod.rs

1pub mod logging;
2pub use logging::{logger, HttpLogData, LogOption};
3use tokio_tungstenite::tungstenite::protocol::WebSocketConfig;
4pub mod onion;
5mod router;
6mod service;
7use crate::{
8    client::{
9        load_balancing::{self, LoadBalancer, Strategy},
10        worker::{WorkerConfigs, WorkerType},
11        ExclusiveBody,
12    },
13    error::{FaucetError, FaucetResult},
14    leak,
15    shutdown::ShutdownSignal,
16};
17use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request};
18use hyper_util::rt::TokioIo;
19use onion::{Service, ServiceBuilder};
20use service::{AddStateLayer, ProxyService};
21use std::{
22    ffi::{OsStr, OsString},
23    net::SocketAddr,
24    num::NonZeroUsize,
25    path::{Path, PathBuf},
26    pin::pin,
27    sync::Arc,
28};
29use tokio::net::TcpListener;
30
31pub use router::RouterConfig;
32
33use self::{logging::LogService, service::AddStateService};
34
35fn determine_strategy(server_type: WorkerType, strategy: Option<Strategy>) -> Strategy {
36    match server_type {
37        WorkerType::Plumber =>
38            strategy.unwrap_or_else(|| {
39                log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to round robin for plumber.");
40                Strategy::RoundRobin
41            }),
42        WorkerType::Shiny | WorkerType::QuartoShiny => match strategy {
43            None => {
44                log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to IP hash for shiny.");
45                Strategy::IpHash
46            },
47            Some(Strategy::Rps) => {
48                log::debug!(target: "faucet", "RPS load balancing strategy specified for shiny, switching to IP hash.");
49                Strategy::IpHash
50            },
51            Some(Strategy::CookieHash) => Strategy::CookieHash,
52            Some(Strategy::RoundRobin) => {
53                log::debug!(target: "faucet", "Round robin load balancing strategy specified for shiny, switching to IP hash.");
54                Strategy::IpHash
55            },
56            Some(Strategy::IpHash) => Strategy::IpHash,
57        },
58        #[cfg(test)]
59        WorkerType::Dummy => {
60            log::debug!(target: "faucet", "WorkerType is Dummy, defaulting strategy to RoundRobin for tests.");
61            Strategy::RoundRobin
62        }
63    }
64}
65
66pub struct FaucetServerBuilder {
67    strategy: Option<Strategy>,
68    bind: Option<SocketAddr>,
69    n_workers: Option<NonZeroUsize>,
70    server_type: Option<WorkerType>,
71    workdir: Option<PathBuf>,
72    extractor: Option<load_balancing::IpExtractor>,
73    rscript: Option<OsString>,
74    app_dir: Option<String>,
75    quarto: Option<OsString>,
76    qmd: Option<PathBuf>,
77    route: Option<String>,
78    max_rps: Option<f64>,
79}
80
81impl FaucetServerBuilder {
82    pub fn new() -> Self {
83        FaucetServerBuilder {
84            strategy: None,
85            bind: None,
86            n_workers: None,
87            server_type: None,
88            workdir: None,
89            extractor: None,
90            rscript: None,
91            app_dir: None,
92            route: None,
93            quarto: None,
94            qmd: None,
95            max_rps: None,
96        }
97    }
98    pub fn app_dir(mut self, app_dir: Option<impl AsRef<str>>) -> Self {
99        self.app_dir = app_dir.map(|s| s.as_ref().into());
100        self
101    }
102    pub fn strategy(mut self, strategy: Option<Strategy>) -> Self {
103        log::debug!(target: "faucet", "Using load balancing strategy: {strategy:?}");
104        self.strategy = strategy;
105        self
106    }
107    pub fn bind(mut self, bind: SocketAddr) -> Self {
108        log::debug!(target: "faucet", "Will bind to: {bind}");
109        self.bind = Some(bind);
110        self
111    }
112    pub fn extractor(mut self, extractor: load_balancing::IpExtractor) -> Self {
113        log::debug!(target: "faucet", "Using IP extractor: {extractor:?}");
114        self.extractor = Some(extractor);
115        self
116    }
117    pub fn workers(mut self, n: usize) -> Self {
118        log::debug!(target: "faucet", "Will spawn {n} workers");
119        self.n_workers = match n.try_into() {
120            Ok(n) => Some(n),
121            Err(_) => {
122                log::error!(target: "faucet", "Number of workers must be greater than 0");
123                std::process::exit(1);
124            }
125        };
126        self
127    }
128    pub fn server_type(mut self, server_type: WorkerType) -> Self {
129        log::debug!(target: "faucet", "Using worker type: {server_type:?}");
130        self.server_type = Some(server_type);
131        self
132    }
133    pub fn workdir(mut self, workdir: impl AsRef<Path>) -> Self {
134        log::debug!(target: "faucet", "Using workdir: {:?}", workdir.as_ref());
135        self.workdir = Some(workdir.as_ref().into());
136        self
137    }
138    pub fn rscript(mut self, rscript: impl AsRef<OsStr>) -> Self {
139        log::debug!(target: "faucet", "Using Rscript command: {:?}", rscript.as_ref());
140        self.rscript = Some(rscript.as_ref().into());
141        self
142    }
143    pub fn quarto(mut self, quarto: impl AsRef<OsStr>) -> Self {
144        log::debug!(target: "faucet", "Using quarto command: {:?}", quarto.as_ref());
145        self.quarto = Some(quarto.as_ref().into());
146        self
147    }
148    pub fn qmd(mut self, qmd: Option<impl AsRef<Path>>) -> Self {
149        self.qmd = qmd.map(|s| s.as_ref().into());
150        self
151    }
152    pub fn route(mut self, route: String) -> Self {
153        self.route = Some(route);
154        self
155    }
156    pub fn max_rps(mut self, max_rps: Option<f64>) -> Self {
157        self.max_rps = max_rps;
158        self
159    }
160    pub fn build(self) -> FaucetResult<FaucetServerConfig> {
161        let server_type = self
162            .server_type
163            .ok_or(FaucetError::MissingArgument("server_type"))?;
164        let strategy = determine_strategy(server_type, self.strategy);
165        let bind = self.bind;
166        let n_workers = self.n_workers.unwrap_or_else(|| {
167            log::debug!(target: "faucet", "No number of workers specified. Defaulting to the number of logical cores.");
168            num_cpus::get().try_into().expect("num_cpus::get() returned 0")
169        });
170        let workdir = self.workdir
171            .map(|wd| leak!(wd, Path))
172            .unwrap_or_else(|| {
173                log::debug!(target: "faucet", "No workdir specified. Defaulting to the current directory.");
174                Path::new(".")
175            });
176        let rscript = self.rscript.map(|wd| leak!(wd, OsStr)).unwrap_or_else(|| {
177            log::debug!(target: "faucet", "No Rscript command specified. Defaulting to `Rscript`.");
178            OsStr::new("Rscript")
179        });
180        let extractor = self.extractor.unwrap_or_else(|| {
181            log::debug!(target: "faucet", "No IP extractor specified. Defaulting to client address.");
182            load_balancing::IpExtractor::ClientAddr
183        });
184        let app_dir = self.app_dir.map(|app_dir| leak!(app_dir, str));
185        let qmd = self.qmd.map(|qmd| leak!(qmd, Path));
186        let quarto = self.quarto.map(|qmd| leak!(qmd, OsStr)).unwrap_or_else(|| {
187            log::debug!(target: "faucet", "No quarto command specified. Defaulting to `quarto`.");
188            OsStr::new("quarto")
189        });
190        let route = self.route.map(|r| -> &'static _ { leak!(r) });
191        let max_rps = self.max_rps;
192        Ok(FaucetServerConfig {
193            strategy,
194            bind,
195            n_workers,
196            server_type,
197            workdir,
198            extractor,
199            rscript,
200            app_dir,
201            route,
202            quarto,
203            qmd,
204            max_rps,
205        })
206    }
207}
208
209impl Default for FaucetServerBuilder {
210    fn default() -> Self {
211        Self::new()
212    }
213}
214
215#[derive(Clone)]
216pub struct FaucetServerConfig {
217    pub strategy: Strategy,
218    pub bind: Option<SocketAddr>,
219    pub n_workers: NonZeroUsize,
220    pub server_type: WorkerType,
221    pub workdir: &'static Path,
222    pub extractor: load_balancing::IpExtractor,
223    pub rscript: &'static OsStr,
224    pub quarto: &'static OsStr,
225    pub app_dir: Option<&'static str>,
226    pub route: Option<&'static str>,
227    pub qmd: Option<&'static Path>,
228    pub max_rps: Option<f64>,
229}
230
231impl FaucetServerConfig {
232    pub async fn run(
233        self,
234        shutdown: &'static ShutdownSignal,
235        websocket_config: &'static WebSocketConfig,
236    ) -> FaucetResult<()> {
237        let mut workers = WorkerConfigs::new(self.clone(), shutdown).await?;
238        let load_balancer = LoadBalancer::new(
239            self.strategy,
240            self.extractor,
241            &workers.workers,
242            self.max_rps,
243        )
244        .await?;
245        let bind = self.bind.ok_or(FaucetError::MissingArgument("bind"))?;
246
247        let load_balancer = load_balancer.clone();
248        let service = Arc::new(
249            ServiceBuilder::new(ProxyService {
250                shutdown,
251                websocket_config,
252            })
253            .layer(logging::LogLayer {})
254            .layer(AddStateLayer::new(load_balancer))
255            .build(),
256        );
257
258        // Bind to the port and listen for incoming TCP connections
259        let listener = TcpListener::bind(bind).await?;
260        log::info!(target: "faucet", "Listening on http://{bind}");
261        let main_loop = || async {
262            loop {
263                match listener.accept().await {
264                    Err(e) => {
265                        log::error!(target: "faucet", "Unable to accept TCP connection: {e}");
266                        return;
267                    }
268                    Ok((tcp, client_addr)) => {
269                        let tcp = TokioIo::new(tcp);
270                        log::debug!(target: "faucet", "Accepted TCP connection from {client_addr}");
271
272                        let service = service.clone();
273
274                        tokio::task::spawn(async move {
275                            let mut conn = http1::Builder::new()
276                                .half_close(true)
277                                .serve_connection(
278                                    tcp,
279                                    service_fn(|req: Request<Incoming>| {
280                                        service.call(req, Some(client_addr.ip()))
281                                    }),
282                                )
283                                .with_upgrades();
284
285                            let conn = pin!(&mut conn);
286
287                            tokio::select! {
288                                result = conn => {
289                                    if let Err(e) = result {
290                                        log::error!(target: "faucet", "Connection error: {e:?}");
291                                    }
292                                }
293                                _ = shutdown.wait() => ()
294                            }
295                        });
296                    }
297                };
298            }
299        };
300
301        // Race the shutdown vs the main loop
302        tokio::select! {
303            _ = shutdown.wait() => (),
304            _ = main_loop() => (),
305        }
306
307        log::debug!("Main loop ended!");
308
309        for worker in &mut workers.workers {
310            log::debug!("Waiting for {} to finish", worker.target);
311            worker.wait_until_done().await;
312        }
313
314        log::debug!("All workers are finished!");
315
316        FaucetResult::Ok(())
317    }
318    pub async fn extract_service(
319        self,
320        shutdown: &'static ShutdownSignal,
321        websocket_config: &'static WebSocketConfig,
322    ) -> FaucetResult<(FaucetServerService, WorkerConfigs)> {
323        let workers = WorkerConfigs::new(self.clone(), shutdown).await?;
324        let load_balancer = LoadBalancer::new(
325            self.strategy,
326            self.extractor,
327            &workers.workers,
328            self.max_rps,
329        )
330        .await?;
331        let service = Arc::new(
332            ServiceBuilder::new(ProxyService {
333                shutdown,
334                websocket_config,
335            })
336            .layer(logging::LogLayer {})
337            .layer(AddStateLayer::new(load_balancer))
338            .build(),
339        );
340
341        Ok((FaucetServerService { inner: service }, workers))
342    }
343}
344
345pub struct FaucetServerService {
346    inner: Arc<AddStateService<LogService<ProxyService>>>,
347}
348
349impl Clone for FaucetServerService {
350    fn clone(&self) -> Self {
351        FaucetServerService {
352            inner: Arc::clone(&self.inner),
353        }
354    }
355}
356
357impl Service<hyper::Request<Incoming>> for FaucetServerService {
358    type Error = FaucetError;
359    type Response = hyper::Response<ExclusiveBody>;
360    async fn call(
361        &self,
362        req: hyper::Request<Incoming>,
363        ip_addr: Option<std::net::IpAddr>,
364    ) -> Result<Self::Response, Self::Error> {
365        self.inner.call(req, ip_addr).await
366    }
367}