faucet_server/server/
mod.rs

1mod logging;
2pub use logging::{logger, LogData, LogOption};
3pub mod onion;
4mod router;
5mod service;
6use crate::{
7    client::{
8        load_balancing::{self, LoadBalancer, Strategy},
9        worker::{WorkerType, Workers},
10        ExclusiveBody,
11    },
12    error::{FaucetError, FaucetResult},
13    leak,
14    shutdown::ShutdownSignal,
15    telemetry::{TelemetryManager, TelemetrySender},
16};
17use hyper::{body::Incoming, server::conn::http1, service::service_fn, Request};
18use hyper_util::rt::TokioIo;
19use onion::{Service, ServiceBuilder};
20use service::{AddStateLayer, ProxyService};
21use std::{
22    ffi::{OsStr, OsString},
23    net::SocketAddr,
24    num::NonZeroUsize,
25    path::{Path, PathBuf},
26    pin::pin,
27    sync::Arc,
28};
29use tokio::net::TcpListener;
30
31pub use router::RouterConfig;
32
33use self::{logging::LogService, service::AddStateService};
34
35fn determine_strategy(server_type: WorkerType, strategy: Option<Strategy>) -> Strategy {
36    match server_type {
37        WorkerType::Plumber =>
38            strategy.unwrap_or_else(|| {
39                log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to round robin for plumber.");
40                Strategy::RoundRobin
41            }),
42        WorkerType::Shiny | WorkerType::QuartoShiny => match strategy {
43            None => {
44                log::debug!(target: "faucet", "No load balancing strategy specified. Defaulting to IP hash for shiny.");
45                Strategy::IpHash
46            },
47            Some(Strategy::CookieHash) => Strategy::CookieHash,
48            Some(Strategy::RoundRobin) => {
49                log::debug!(target: "faucet", "Round robin load balancing strategy specified for shiny, switching to IP hash.");
50                Strategy::IpHash
51            },
52            Some(Strategy::IpHash) => Strategy::IpHash,
53        }
54    }
55}
56
57pub struct FaucetServerBuilder {
58    strategy: Option<Strategy>,
59    bind: Option<SocketAddr>,
60    n_workers: Option<NonZeroUsize>,
61    server_type: Option<WorkerType>,
62    workdir: Option<PathBuf>,
63    extractor: Option<load_balancing::IpExtractor>,
64    rscript: Option<OsString>,
65    app_dir: Option<String>,
66    quarto: Option<OsString>,
67    qmd: Option<PathBuf>,
68    route: Option<String>,
69    telemetry: Option<TelemetrySender>,
70}
71
72impl FaucetServerBuilder {
73    pub fn new() -> Self {
74        FaucetServerBuilder {
75            strategy: None,
76            bind: None,
77            n_workers: None,
78            server_type: None,
79            workdir: None,
80            extractor: None,
81            rscript: None,
82            app_dir: None,
83            route: None,
84            quarto: None,
85            qmd: None,
86            telemetry: None,
87        }
88    }
89    pub fn app_dir(mut self, app_dir: Option<impl AsRef<str>>) -> Self {
90        self.app_dir = app_dir.map(|s| s.as_ref().into());
91        self
92    }
93    pub fn strategy(mut self, strategy: Option<Strategy>) -> Self {
94        log::debug!(target: "faucet", "Using load balancing strategy: {:?}", strategy);
95        self.strategy = strategy;
96        self
97    }
98    pub fn bind(mut self, bind: SocketAddr) -> Self {
99        log::debug!(target: "faucet", "Will bind to: {}", bind);
100        self.bind = Some(bind);
101        self
102    }
103    pub fn extractor(mut self, extractor: load_balancing::IpExtractor) -> Self {
104        log::debug!(target: "faucet", "Using IP extractor: {:?}", extractor);
105        self.extractor = Some(extractor);
106        self
107    }
108    pub fn workers(mut self, n: usize) -> Self {
109        log::debug!(target: "faucet", "Will spawn {} workers", n);
110        self.n_workers = match n.try_into() {
111            Ok(n) => Some(n),
112            Err(_) => {
113                log::error!(target: "faucet", "Number of workers must be greater than 0");
114                std::process::exit(1);
115            }
116        };
117        self
118    }
119    pub fn server_type(mut self, server_type: WorkerType) -> Self {
120        log::debug!(target: "faucet", "Using worker type: {:?}", server_type);
121        self.server_type = Some(server_type);
122        self
123    }
124    pub fn workdir(mut self, workdir: impl AsRef<Path>) -> Self {
125        log::debug!(target: "faucet", "Using workdir: {:?}", workdir.as_ref());
126        self.workdir = Some(workdir.as_ref().into());
127        self
128    }
129    pub fn rscript(mut self, rscript: impl AsRef<OsStr>) -> Self {
130        log::debug!(target: "faucet", "Using Rscript command: {:?}", rscript.as_ref());
131        self.rscript = Some(rscript.as_ref().into());
132        self
133    }
134    pub fn quarto(mut self, quarto: impl AsRef<OsStr>) -> Self {
135        log::debug!(target: "faucet", "Using quarto command: {:?}", quarto.as_ref());
136        self.quarto = Some(quarto.as_ref().into());
137        self
138    }
139    pub fn qmd(mut self, qmd: Option<impl AsRef<Path>>) -> Self {
140        self.qmd = qmd.map(|s| s.as_ref().into());
141        self
142    }
143    pub fn telemetry(mut self, telemetry_manager: Option<&TelemetryManager>) -> Self {
144        self.telemetry = telemetry_manager.map(|m| m.sender.clone());
145        self
146    }
147    pub fn route(mut self, route: String) -> Self {
148        self.route = Some(route);
149        self
150    }
151    pub fn build(self) -> FaucetResult<FaucetServerConfig> {
152        let server_type = self
153            .server_type
154            .ok_or(FaucetError::MissingArgument("server_type"))?;
155        let strategy = determine_strategy(server_type, self.strategy);
156        let bind = self.bind;
157        let n_workers = self.n_workers.unwrap_or_else(|| {
158            log::debug!(target: "faucet", "No number of workers specified. Defaulting to the number of logical cores.");
159            num_cpus::get().try_into().expect("num_cpus::get() returned 0")
160        });
161        let workdir = self.workdir
162            .map(|wd| leak!(wd, Path))
163            .unwrap_or_else(|| {
164                log::debug!(target: "faucet", "No workdir specified. Defaulting to the current directory.");
165                Path::new(".")
166            });
167        let rscript = self.rscript.map(|wd| leak!(wd, OsStr)).unwrap_or_else(|| {
168            log::debug!(target: "faucet", "No Rscript command specified. Defaulting to `Rscript`.");
169            OsStr::new("Rscript")
170        });
171        let extractor = self.extractor.unwrap_or_else(|| {
172            log::debug!(target: "faucet", "No IP extractor specified. Defaulting to client address.");
173            load_balancing::IpExtractor::ClientAddr
174        });
175        let app_dir = self.app_dir.map(|app_dir| leak!(app_dir, str));
176        let qmd = self.qmd.map(|qmd| leak!(qmd, Path));
177        let quarto = self.quarto.map(|qmd| leak!(qmd, OsStr)).unwrap_or_else(|| {
178            log::debug!(target: "faucet", "No quarto command specified. Defaulting to `quarto`.");
179            OsStr::new("quarto")
180        });
181        let telemetry = self.telemetry;
182        let route = self.route.map(|r| -> &'static _ { leak!(r) });
183        Ok(FaucetServerConfig {
184            strategy,
185            bind,
186            n_workers,
187            server_type,
188            workdir,
189            extractor,
190            rscript,
191            app_dir,
192            route,
193            quarto,
194            telemetry,
195            qmd,
196        })
197    }
198}
199
200impl Default for FaucetServerBuilder {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206#[derive(Clone)]
207pub struct FaucetServerConfig {
208    pub strategy: Strategy,
209    pub bind: Option<SocketAddr>,
210    pub n_workers: NonZeroUsize,
211    pub server_type: WorkerType,
212    pub workdir: &'static Path,
213    pub extractor: load_balancing::IpExtractor,
214    pub rscript: &'static OsStr,
215    pub quarto: &'static OsStr,
216    pub telemetry: Option<TelemetrySender>,
217    pub app_dir: Option<&'static str>,
218    pub route: Option<&'static str>,
219    pub qmd: Option<&'static Path>,
220}
221
222impl FaucetServerConfig {
223    pub async fn run(self, shutdown: ShutdownSignal) -> FaucetResult<()> {
224        let telemetry = self.telemetry.clone();
225        let mut workers = Workers::new(self.clone(), shutdown.clone()).await?;
226        let targets = workers.get_workers_config();
227        let load_balancer = LoadBalancer::new(self.strategy, self.extractor, &targets)?;
228        let bind = self.bind.ok_or(FaucetError::MissingArgument("bind"))?;
229
230        let load_balancer = load_balancer.clone();
231        let service = Arc::new(
232            ServiceBuilder::new(ProxyService)
233                .layer(logging::LogLayer { telemetry })
234                .layer(AddStateLayer::new(load_balancer))
235                .build(),
236        );
237
238        // Bind to the port and listen for incoming TCP connections
239        let listener = TcpListener::bind(bind).await?;
240        log::info!(target: "faucet", "Listening on http://{}", bind);
241        let main_loop = || async {
242            loop {
243                match listener.accept().await {
244                    Err(e) => {
245                        log::error!(target: "faucet", "Unable to accept TCP connection: {e}");
246                        return;
247                    }
248                    Ok((tcp, client_addr)) => {
249                        let tcp = TokioIo::new(tcp);
250                        log::debug!(target: "faucet", "Accepted TCP connection from {}", client_addr);
251
252                        let service = service.clone();
253                        let shutdown = shutdown.clone();
254
255                        tokio::task::spawn(async move {
256                            let mut conn = http1::Builder::new()
257                                .half_close(true)
258                                .serve_connection(
259                                    tcp,
260                                    service_fn(|req: Request<Incoming>| {
261                                        service.call(req, Some(client_addr.ip()))
262                                    }),
263                                )
264                                .with_upgrades();
265
266                            let conn = pin!(&mut conn);
267
268                            tokio::select! {
269                                result = conn => {
270                                    if let Err(e) = result {
271                                        log::error!(target: "faucet", "Connection error: {:?}", e);
272                                    }
273                                }
274                                _ = shutdown.wait() => ()
275                            }
276                        });
277                    }
278                };
279            }
280        };
281
282        // Race the shutdown vs the main loop
283        tokio::select! {
284            _ = shutdown.wait() => (),
285            _ = main_loop() => (),
286        }
287
288        for worker in &mut workers.workers {
289            worker.child.wait_until_done().await;
290        }
291
292        FaucetResult::Ok(())
293    }
294    pub async fn extract_service(
295        self,
296        shutdown: ShutdownSignal,
297    ) -> FaucetResult<(FaucetServerService, Workers)> {
298        let telemetry = self.telemetry.clone();
299        let workers = Workers::new(self.clone(), shutdown).await?;
300        let targets = workers.get_workers_config();
301        let load_balancer = LoadBalancer::new(self.strategy, self.extractor, &targets)?;
302        let service = Arc::new(
303            ServiceBuilder::new(ProxyService)
304                .layer(logging::LogLayer { telemetry })
305                .layer(AddStateLayer::new(load_balancer))
306                .build(),
307        );
308
309        Ok((FaucetServerService { inner: service }, workers))
310    }
311}
312
313pub struct FaucetServerService {
314    inner: Arc<AddStateService<LogService<ProxyService>>>,
315}
316
317impl Clone for FaucetServerService {
318    fn clone(&self) -> Self {
319        FaucetServerService {
320            inner: Arc::clone(&self.inner),
321        }
322    }
323}
324
325impl Service<hyper::Request<Incoming>> for FaucetServerService {
326    type Error = FaucetError;
327    type Response = hyper::Response<ExclusiveBody>;
328    async fn call(
329        &self,
330        req: hyper::Request<Incoming>,
331        ip_addr: Option<std::net::IpAddr>,
332    ) -> Result<Self::Response, Self::Error> {
333        self.inner.call(req, ip_addr).await
334    }
335}