faucet_server/client/
worker.rs

1use crate::{
2    error::{FaucetError, FaucetResult},
3    leak,
4    networking::get_available_socket,
5    server::FaucetServerConfig,
6    shutdown::ShutdownSignal,
7};
8use std::{
9    ffi::OsStr,
10    net::SocketAddr,
11    path::Path,
12    sync::atomic::{AtomicBool, Ordering},
13    time::Duration,
14};
15use tokio::{process::Child, task::JoinHandle};
16use tokio_stream::StreamExt;
17use tokio_util::codec::{FramedRead, LinesCodec};
18
19#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, serde::Deserialize)]
20pub enum WorkerType {
21    #[serde(alias = "plumber", alias = "Plumber")]
22    Plumber,
23    #[serde(alias = "shiny", alias = "Shiny")]
24    Shiny,
25    #[serde(alias = "quarto-shiny", alias = "QuartoShiny", alias = "quarto_shiny")]
26    QuartoShiny,
27}
28
29fn log_stdio(mut child: Child, target: &'static str) -> FaucetResult<Child> {
30    let pid = child.id().expect("Failed to get plumber worker PID");
31
32    let mut stdout = FramedRead::new(
33        child.stdout.take().ok_or(FaucetError::Unknown(format!(
34            "Unable to take stdout from PID {pid}"
35        )))?,
36        LinesCodec::new(),
37    );
38
39    let mut stderr = FramedRead::new(
40        child.stderr.take().ok_or(FaucetError::Unknown(format!(
41            "Unable to take stderr from PID {pid}"
42        )))?,
43        LinesCodec::new(),
44    );
45
46    tokio::spawn(async move {
47        while let Some(line) = stderr.next().await {
48            if let Ok(line) = line {
49                log::warn!(target: target, "{line}");
50            }
51        }
52    });
53
54    tokio::spawn(async move {
55        while let Some(line) = stdout.next().await {
56            if let Ok(line) = line {
57                log::info!(target: target, "{line}");
58            }
59        }
60    });
61
62    Ok(child)
63}
64
65#[derive(Copy, Clone)]
66pub struct WorkerConfig {
67    pub wtype: WorkerType,
68    pub app_dir: Option<&'static str>,
69    pub rscript: &'static OsStr,
70    pub quarto: &'static OsStr,
71    pub workdir: &'static Path,
72    pub addr: SocketAddr,
73    pub target: &'static str,
74    pub worker_id: usize,
75    pub worker_route: Option<&'static str>,
76    pub is_online: &'static AtomicBool,
77    pub qmd: Option<&'static Path>,
78}
79
80impl WorkerConfig {
81    fn new(worker_id: usize, addr: SocketAddr, server_config: &FaucetServerConfig) -> Self {
82        Self {
83            addr,
84            worker_id,
85            is_online: leak!(AtomicBool::new(false)),
86            workdir: server_config.workdir,
87            worker_route: server_config.route,
88            target: leak!(format!("Worker::{}", worker_id)),
89            app_dir: server_config.app_dir,
90            wtype: server_config.server_type,
91            rscript: server_config.rscript,
92            quarto: server_config.quarto,
93            qmd: server_config.qmd,
94        }
95    }
96    #[allow(dead_code)]
97    pub fn dummy(target: &'static str, addr: &str, online: bool) -> WorkerConfig {
98        WorkerConfig {
99            target,
100            is_online: leak!(AtomicBool::new(online)),
101            addr: addr.parse().unwrap(),
102            app_dir: None,
103            worker_route: None,
104            rscript: OsStr::new(""),
105            wtype: crate::client::worker::WorkerType::Shiny,
106            worker_id: 1,
107            quarto: OsStr::new(""),
108            workdir: Path::new("."),
109            qmd: None,
110        }
111    }
112}
113
114fn spawn_child_rscript_process(
115    config: WorkerConfig,
116    command: impl AsRef<str>,
117) -> FaucetResult<Child> {
118    let mut cmd = tokio::process::Command::new(config.rscript);
119
120    // Set the current directory to the directory containing the entrypoint
121    cmd.current_dir(config.workdir)
122        .arg("-e")
123        .arg(command.as_ref())
124        .stdin(std::process::Stdio::null())
125        .stdout(std::process::Stdio::piped())
126        .stderr(std::process::Stdio::piped())
127        .env("FAUCET_WORKER_ID", config.worker_id.to_string())
128        // This is needed to make sure the child process is killed when the parent is dropped
129        .kill_on_drop(true);
130
131    #[cfg(unix)]
132    unsafe {
133        cmd.pre_exec(|| {
134            // Create a new process group for the child process
135            nix::libc::setpgid(0, 0);
136            Ok(())
137        });
138    }
139
140    cmd.spawn().map_err(Into::into)
141}
142
143fn spawn_plumber_worker(config: WorkerConfig) -> FaucetResult<Child> {
144    let command = format!(
145        r#"
146        options("plumber.port" = {port})
147        plumber::pr_run(plumber::plumb())
148        "#,
149        port = config.addr.port()
150    );
151    let child = spawn_child_rscript_process(config, command)?;
152
153    log_stdio(child, config.target)
154}
155
156fn spawn_shiny_worker(config: WorkerConfig) -> FaucetResult<Child> {
157    let command = format!(
158        r#"
159        options("shiny.port" = {port})
160        shiny::runApp("{app_dir}")
161        "#,
162        port = config.addr.port(),
163        app_dir = config.app_dir.unwrap_or(".")
164    );
165    let child = spawn_child_rscript_process(config, command)?;
166
167    log_stdio(child, config.target)
168}
169
170fn spawn_quarto_shiny_worker(config: WorkerConfig) -> FaucetResult<Child> {
171    let mut cmd = tokio::process::Command::new(config.quarto);
172    // Set the current directory to the directory containing the entrypoint
173    cmd.current_dir(config.workdir)
174        .arg("serve")
175        .args(["--port", config.addr.port().to_string().as_str()])
176        .arg(config.qmd.ok_or(FaucetError::MissingArgument("qmd"))?)
177        .stdin(std::process::Stdio::null())
178        .stdout(std::process::Stdio::piped())
179        .stderr(std::process::Stdio::piped())
180        .env("FAUCET_WORKER_ID", config.worker_id.to_string())
181        // This is needed to make sure the child process is killed when the parent is dropped
182        .kill_on_drop(true);
183
184    #[cfg(unix)]
185    unsafe {
186        cmd.pre_exec(|| {
187            // Create a new process group for the child process
188            nix::libc::setpgid(0, 0);
189            Ok(())
190        });
191    }
192
193    let child = cmd.spawn()?;
194
195    log_stdio(child, config.target)
196}
197
198impl WorkerConfig {
199    fn spawn_process(self, config: WorkerConfig) -> Child {
200        let child_result = match self.wtype {
201            WorkerType::Plumber => spawn_plumber_worker(config),
202            WorkerType::Shiny => spawn_shiny_worker(config),
203            WorkerType::QuartoShiny => spawn_quarto_shiny_worker(config),
204        };
205        match child_result {
206            Ok(child) => child,
207            Err(e) => {
208                log::error!(target: "faucet", "Failed to invoke R for {target}: {e}", target = config.target);
209                log::error!(target: "faucet", "Exiting...");
210                std::process::exit(1);
211            }
212        }
213    }
214}
215
216pub struct Worker {
217    /// Whether the worker should be stopped
218    pub child: WorkerChild,
219    /// The address of the worker's socket.
220    pub config: WorkerConfig,
221}
222
223async fn check_if_online(addr: SocketAddr) -> bool {
224    let stream = tokio::net::TcpStream::connect(addr).await;
225    stream.is_ok()
226}
227
228const RECHECK_INTERVAL: Duration = Duration::from_millis(250);
229
230pub struct WorkerChild {
231    handle: Option<JoinHandle<FaucetResult<()>>>,
232}
233
234impl WorkerChild {
235    pub async fn wait_until_done(&mut self) {
236        if let Some(handle) = self.handle.take() {
237            let _ = handle.await;
238        }
239    }
240}
241
242fn spawn_worker_task(config: WorkerConfig, shutdown: ShutdownSignal) -> WorkerChild {
243    let handle = tokio::spawn(async move {
244        'outer: loop {
245            let mut child = config.spawn_process(config);
246            let pid = child.id().expect("Failed to get plumber worker PID");
247
248            // We will run this loop asynchrnously on this same thread.
249            // We will use this to wait for either the stop signal
250            // or the child exiting
251            let child_loop = async {
252                log::info!(target: "faucet", "Starting process {pid} for {target} on port {port}", port = config.addr.port(), target = config.target);
253                loop {
254                    // Try to connect to the socket
255                    let check_status = check_if_online(config.addr).await;
256                    // If it's online, we can break out of the loop and start serving connections
257                    if check_status {
258                        log::info!(target: "faucet", "{target} is online and ready to serve connections at {route}", target = config.target, route = config.worker_route.unwrap_or("/"));
259                        config.is_online.store(check_status, Ordering::SeqCst);
260                        break;
261                    }
262                    // If it's not online but the child process has exited, we should break out of the loop
263                    // and restart the process
264                    if child.try_wait()?.is_some() {
265                        break;
266                    }
267
268                    tokio::time::sleep(RECHECK_INTERVAL).await;
269                }
270                FaucetResult::Ok(child.wait().await?)
271            };
272            tokio::select! {
273                // If we receive a stop signal that means we will stop the outer loop
274                // and kill the process
275                _ = shutdown.wait() => {
276                    let _ = child.kill().await;
277                    log::info!(target: "faucet", "{target}'s process ({pid}) killed", target = config.target);
278                    break 'outer;
279                },
280                // If our child loop stops that means the process crashed. We will restart it
281                status = child_loop => {
282                    config
283                        .is_online
284                        .store(false, std::sync::atomic::Ordering::SeqCst);
285                    log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid, status?, target = config.target);
286                    continue 'outer;
287                }
288            }
289        }
290        FaucetResult::Ok(())
291    });
292    WorkerChild {
293        handle: Some(handle),
294    }
295}
296
297impl Worker {
298    pub fn from_config(config: WorkerConfig, shutdown: ShutdownSignal) -> FaucetResult<Self> {
299        let child = spawn_worker_task(config, shutdown);
300        Ok(Self { child, config })
301    }
302}
303
304pub struct Workers {
305    pub workers: Box<[Worker]>,
306}
307
308const TRIES: usize = 20;
309
310impl Workers {
311    pub(crate) async fn new(
312        server_config: FaucetServerConfig,
313        shutdown: ShutdownSignal,
314    ) -> FaucetResult<Self> {
315        let mut workers = Vec::with_capacity(server_config.n_workers.get());
316
317        for id in 0..server_config.n_workers.get() {
318            let socket_addr = get_available_socket(TRIES).await?;
319            let config = WorkerConfig::new(id + 1, socket_addr, &server_config);
320            let worker = Worker::from_config(config, shutdown.clone())?;
321            workers.push(worker);
322        }
323
324        let workers = workers.into_boxed_slice();
325
326        Ok(Self { workers })
327    }
328    pub(crate) fn get_workers_config(&self) -> Vec<WorkerConfig> {
329        self.workers.iter().map(|w| w.config).collect()
330    }
331}