faucet_server/server/
mod.rs

1pub mod logging;
2pub use logging::{logger, HttpLogData, LogOption};
3pub mod onion;
4mod router;
5mod service;
6use crate::{
7    client::{
8        load_balancing::{self, LoadBalancer, Strategy},
9        worker::{WorkerConfigs, WorkerType},
10        ExclusiveBody,
11    },
12    error::{FaucetError, FaucetResult},
13    leak,
14    shutdown::ShutdownSignal,
15};
16use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request};
17use hyper_util::rt::TokioIo;
18use onion::{Service, ServiceBuilder};
19use service::{AddStateLayer, ProxyService};
20use std::{
21    ffi::{OsStr, OsString},
22    net::SocketAddr,
23    num::NonZeroUsize,
24    path::{Path, PathBuf},
25    pin::pin,
26    sync::Arc,
27};
28use tokio::net::TcpListener;
29
30pub use router::RouterConfig;
31
32use self::{logging::LogService, service::AddStateService};
33
34fn determine_strategy(server_type: WorkerType, strategy: Option<Strategy>) -> Strategy {
35    match server_type {
36        WorkerType::Plumber =>
37            strategy.unwrap_or_else(|| {
38                log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to round robin for plumber.");
39                Strategy::RoundRobin
40            }),
41        WorkerType::Shiny | WorkerType::QuartoShiny => match strategy {
42            None => {
43                log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to IP hash for shiny.");
44                Strategy::IpHash
45            },
46            Some(Strategy::Rps) => {
47                log::debug!(target: "faucet", "RPS load balancing strategy specified for shiny, switching to IP hash.");
48                Strategy::IpHash
49            },
50            Some(Strategy::CookieHash) => Strategy::CookieHash,
51            Some(Strategy::RoundRobin) => {
52                log::debug!(target: "faucet", "Round robin load balancing strategy specified for shiny, switching to IP hash.");
53                Strategy::IpHash
54            },
55            Some(Strategy::IpHash) => Strategy::IpHash,
56        },
57        #[cfg(test)]
58        WorkerType::Dummy => {
59            log::debug!(target: "faucet", "WorkerType is Dummy, defaulting strategy to RoundRobin for tests.");
60            Strategy::RoundRobin
61        }
62    }
63}
64
65pub struct FaucetServerBuilder {
66    strategy: Option<Strategy>,
67    bind: Option<SocketAddr>,
68    n_workers: Option<NonZeroUsize>,
69    server_type: Option<WorkerType>,
70    workdir: Option<PathBuf>,
71    extractor: Option<load_balancing::IpExtractor>,
72    rscript: Option<OsString>,
73    app_dir: Option<String>,
74    quarto: Option<OsString>,
75    qmd: Option<PathBuf>,
76    route: Option<String>,
77    max_rps: Option<f64>,
78}
79
80impl FaucetServerBuilder {
81    pub fn new() -> Self {
82        FaucetServerBuilder {
83            strategy: None,
84            bind: None,
85            n_workers: None,
86            server_type: None,
87            workdir: None,
88            extractor: None,
89            rscript: None,
90            app_dir: None,
91            route: None,
92            quarto: None,
93            qmd: None,
94            max_rps: None,
95        }
96    }
97    pub fn app_dir(mut self, app_dir: Option<impl AsRef<str>>) -> Self {
98        self.app_dir = app_dir.map(|s| s.as_ref().into());
99        self
100    }
101    pub fn strategy(mut self, strategy: Option<Strategy>) -> Self {
102        log::debug!(target: "faucet", "Using load balancing strategy: {strategy:?}");
103        self.strategy = strategy;
104        self
105    }
106    pub fn bind(mut self, bind: SocketAddr) -> Self {
107        log::debug!(target: "faucet", "Will bind to: {bind}");
108        self.bind = Some(bind);
109        self
110    }
111    pub fn extractor(mut self, extractor: load_balancing::IpExtractor) -> Self {
112        log::debug!(target: "faucet", "Using IP extractor: {extractor:?}");
113        self.extractor = Some(extractor);
114        self
115    }
116    pub fn workers(mut self, n: usize) -> Self {
117        log::debug!(target: "faucet", "Will spawn {n} workers");
118        self.n_workers = match n.try_into() {
119            Ok(n) => Some(n),
120            Err(_) => {
121                log::error!(target: "faucet", "Number of workers must be greater than 0");
122                std::process::exit(1);
123            }
124        };
125        self
126    }
127    pub fn server_type(mut self, server_type: WorkerType) -> Self {
128        log::debug!(target: "faucet", "Using worker type: {server_type:?}");
129        self.server_type = Some(server_type);
130        self
131    }
132    pub fn workdir(mut self, workdir: impl AsRef<Path>) -> Self {
133        log::debug!(target: "faucet", "Using workdir: {:?}", workdir.as_ref());
134        self.workdir = Some(workdir.as_ref().into());
135        self
136    }
137    pub fn rscript(mut self, rscript: impl AsRef<OsStr>) -> Self {
138        log::debug!(target: "faucet", "Using Rscript command: {:?}", rscript.as_ref());
139        self.rscript = Some(rscript.as_ref().into());
140        self
141    }
142    pub fn quarto(mut self, quarto: impl AsRef<OsStr>) -> Self {
143        log::debug!(target: "faucet", "Using quarto command: {:?}", quarto.as_ref());
144        self.quarto = Some(quarto.as_ref().into());
145        self
146    }
147    pub fn qmd(mut self, qmd: Option<impl AsRef<Path>>) -> Self {
148        self.qmd = qmd.map(|s| s.as_ref().into());
149        self
150    }
151    pub fn route(mut self, route: String) -> Self {
152        self.route = Some(route);
153        self
154    }
155    pub fn max_rps(mut self, max_rps: Option<f64>) -> Self {
156        self.max_rps = max_rps;
157        self
158    }
159    pub fn build(self) -> FaucetResult<FaucetServerConfig> {
160        let server_type = self
161            .server_type
162            .ok_or(FaucetError::MissingArgument("server_type"))?;
163        let strategy = determine_strategy(server_type, self.strategy);
164        let bind = self.bind;
165        let n_workers = self.n_workers.unwrap_or_else(|| {
166            log::debug!(target: "faucet", "No number of workers specified. Defaulting to the number of logical cores.");
167            num_cpus::get().try_into().expect("num_cpus::get() returned 0")
168        });
169        let workdir = self.workdir
170            .map(|wd| leak!(wd, Path))
171            .unwrap_or_else(|| {
172                log::debug!(target: "faucet", "No workdir specified. Defaulting to the current directory.");
173                Path::new(".")
174            });
175        let rscript = self.rscript.map(|wd| leak!(wd, OsStr)).unwrap_or_else(|| {
176            log::debug!(target: "faucet", "No Rscript command specified. Defaulting to `Rscript`.");
177            OsStr::new("Rscript")
178        });
179        let extractor = self.extractor.unwrap_or_else(|| {
180            log::debug!(target: "faucet", "No IP extractor specified. Defaulting to client address.");
181            load_balancing::IpExtractor::ClientAddr
182        });
183        let app_dir = self.app_dir.map(|app_dir| leak!(app_dir, str));
184        let qmd = self.qmd.map(|qmd| leak!(qmd, Path));
185        let quarto = self.quarto.map(|qmd| leak!(qmd, OsStr)).unwrap_or_else(|| {
186            log::debug!(target: "faucet", "No quarto command specified. Defaulting to `quarto`.");
187            OsStr::new("quarto")
188        });
189        let route = self.route.map(|r| -> &'static _ { leak!(r) });
190        let max_rps = self.max_rps;
191        Ok(FaucetServerConfig {
192            strategy,
193            bind,
194            n_workers,
195            server_type,
196            workdir,
197            extractor,
198            rscript,
199            app_dir,
200            route,
201            quarto,
202            qmd,
203            max_rps,
204        })
205    }
206}
207
208impl Default for FaucetServerBuilder {
209    fn default() -> Self {
210        Self::new()
211    }
212}
213
214#[derive(Clone)]
215pub struct FaucetServerConfig {
216    pub strategy: Strategy,
217    pub bind: Option<SocketAddr>,
218    pub n_workers: NonZeroUsize,
219    pub server_type: WorkerType,
220    pub workdir: &'static Path,
221    pub extractor: load_balancing::IpExtractor,
222    pub rscript: &'static OsStr,
223    pub quarto: &'static OsStr,
224    pub app_dir: Option<&'static str>,
225    pub route: Option<&'static str>,
226    pub qmd: Option<&'static Path>,
227    pub max_rps: Option<f64>,
228}
229
230impl FaucetServerConfig {
231    pub async fn run(self, shutdown: &'static ShutdownSignal) -> FaucetResult<()> {
232        let mut workers = WorkerConfigs::new(self.clone(), shutdown).await?;
233        let load_balancer = LoadBalancer::new(
234            self.strategy,
235            self.extractor,
236            &workers.workers,
237            self.max_rps,
238        )
239        .await?;
240        let bind = self.bind.ok_or(FaucetError::MissingArgument("bind"))?;
241
242        let load_balancer = load_balancer.clone();
243        let service = Arc::new(
244            ServiceBuilder::new(ProxyService { shutdown })
245                .layer(logging::LogLayer {})
246                .layer(AddStateLayer::new(load_balancer))
247                .build(),
248        );
249
250        // Bind to the port and listen for incoming TCP connections
251        let listener = TcpListener::bind(bind).await?;
252        log::info!(target: "faucet", "Listening on http://{bind}");
253        let main_loop = || async {
254            loop {
255                match listener.accept().await {
256                    Err(e) => {
257                        log::error!(target: "faucet", "Unable to accept TCP connection: {e}");
258                        return;
259                    }
260                    Ok((tcp, client_addr)) => {
261                        let tcp = TokioIo::new(tcp);
262                        log::debug!(target: "faucet", "Accepted TCP connection from {client_addr}");
263
264                        let service = service.clone();
265
266                        tokio::task::spawn(async move {
267                            let mut conn = http1::Builder::new()
268                                .half_close(true)
269                                .serve_connection(
270                                    tcp,
271                                    service_fn(|req: Request<Incoming>| {
272                                        service.call(req, Some(client_addr.ip()))
273                                    }),
274                                )
275                                .with_upgrades();
276
277                            let conn = pin!(&mut conn);
278
279                            tokio::select! {
280                                result = conn => {
281                                    if let Err(e) = result {
282                                        log::error!(target: "faucet", "Connection error: {e:?}");
283                                    }
284                                }
285                                _ = shutdown.wait() => ()
286                            }
287                        });
288                    }
289                };
290            }
291        };
292
293        // Race the shutdown vs the main loop
294        tokio::select! {
295            _ = shutdown.wait() => (),
296            _ = main_loop() => (),
297        }
298
299        log::debug!("Main loop ended!");
300
301        for worker in &mut workers.workers {
302            log::debug!("Waiting for {} to finish", worker.target);
303            worker.wait_until_done().await;
304        }
305
306        log::debug!("All workers are finished!");
307
308        FaucetResult::Ok(())
309    }
310    pub async fn extract_service(
311        self,
312        shutdown: &'static ShutdownSignal,
313    ) -> FaucetResult<(FaucetServerService, WorkerConfigs)> {
314        let workers = WorkerConfigs::new(self.clone(), shutdown).await?;
315        let load_balancer = LoadBalancer::new(
316            self.strategy,
317            self.extractor,
318            &workers.workers,
319            self.max_rps,
320        )
321        .await?;
322        let service = Arc::new(
323            ServiceBuilder::new(ProxyService { shutdown })
324                .layer(logging::LogLayer {})
325                .layer(AddStateLayer::new(load_balancer))
326                .build(),
327        );
328
329        Ok((FaucetServerService { inner: service }, workers))
330    }
331}
332
333pub struct FaucetServerService {
334    inner: Arc<AddStateService<LogService<ProxyService>>>,
335}
336
337impl Clone for FaucetServerService {
338    fn clone(&self) -> Self {
339        FaucetServerService {
340            inner: Arc::clone(&self.inner),
341        }
342    }
343}
344
345impl Service<hyper::Request<Incoming>> for FaucetServerService {
346    type Error = FaucetError;
347    type Response = hyper::Response<ExclusiveBody>;
348    async fn call(
349        &self,
350        req: hyper::Request<Incoming>,
351        ip_addr: Option<std::net::IpAddr>,
352    ) -> Result<Self::Response, Self::Error> {
353        self.inner.call(req, ip_addr).await
354    }
355}