1use crate::{
2 error::{FaucetError, FaucetResult},
3 leak,
4 networking::get_available_socket,
5 server::FaucetServerConfig,
6 shutdown::ShutdownSignal,
7};
8use std::{
9 ffi::OsStr,
10 net::SocketAddr,
11 path::Path,
12 sync::atomic::{AtomicBool, Ordering},
13 time::Duration,
14};
15use tokio::{process::Child, task::JoinHandle};
16use tokio_stream::StreamExt;
17use tokio_util::codec::{FramedRead, LinesCodec};
18
19#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, serde::Deserialize)]
20pub enum WorkerType {
21 #[serde(alias = "plumber", alias = "Plumber")]
22 Plumber,
23 #[serde(alias = "shiny", alias = "Shiny")]
24 Shiny,
25 #[serde(alias = "quarto-shiny", alias = "QuartoShiny", alias = "quarto_shiny")]
26 QuartoShiny,
27}
28
29fn log_stdio(mut child: Child, target: &'static str) -> FaucetResult<Child> {
30 let pid = child.id().expect("Failed to get plumber worker PID");
31
32 let mut stdout = FramedRead::new(
33 child.stdout.take().ok_or(FaucetError::Unknown(format!(
34 "Unable to take stdout from PID {pid}"
35 )))?,
36 LinesCodec::new(),
37 );
38
39 let mut stderr = FramedRead::new(
40 child.stderr.take().ok_or(FaucetError::Unknown(format!(
41 "Unable to take stderr from PID {pid}"
42 )))?,
43 LinesCodec::new(),
44 );
45
46 tokio::spawn(async move {
47 while let Some(line) = stderr.next().await {
48 if let Ok(line) = line {
49 log::warn!(target: target, "{line}");
50 }
51 }
52 });
53
54 tokio::spawn(async move {
55 while let Some(line) = stdout.next().await {
56 if let Ok(line) = line {
57 log::info!(target: target, "{line}");
58 }
59 }
60 });
61
62 Ok(child)
63}
64
65#[derive(Copy, Clone)]
66pub struct WorkerConfig {
67 pub wtype: WorkerType,
68 pub app_dir: Option<&'static str>,
69 pub rscript: &'static OsStr,
70 pub quarto: &'static OsStr,
71 pub workdir: &'static Path,
72 pub addr: SocketAddr,
73 pub target: &'static str,
74 pub worker_id: usize,
75 pub worker_route: Option<&'static str>,
76 pub is_online: &'static AtomicBool,
77 pub qmd: Option<&'static Path>,
78}
79
80impl WorkerConfig {
81 fn new(worker_id: usize, addr: SocketAddr, server_config: &FaucetServerConfig) -> Self {
82 Self {
83 addr,
84 worker_id,
85 is_online: leak!(AtomicBool::new(false)),
86 workdir: server_config.workdir,
87 worker_route: server_config.route,
88 target: leak!(format!("Worker::{}", worker_id)),
89 app_dir: server_config.app_dir,
90 wtype: server_config.server_type,
91 rscript: server_config.rscript,
92 quarto: server_config.quarto,
93 qmd: server_config.qmd,
94 }
95 }
96 #[allow(dead_code)]
97 pub fn dummy(target: &'static str, addr: &str, online: bool) -> WorkerConfig {
98 WorkerConfig {
99 target,
100 is_online: leak!(AtomicBool::new(online)),
101 addr: addr.parse().unwrap(),
102 app_dir: None,
103 worker_route: None,
104 rscript: OsStr::new(""),
105 wtype: crate::client::worker::WorkerType::Shiny,
106 worker_id: 1,
107 quarto: OsStr::new(""),
108 workdir: Path::new("."),
109 qmd: None,
110 }
111 }
112}
113
114fn spawn_child_rscript_process(
115 config: WorkerConfig,
116 command: impl AsRef<str>,
117) -> FaucetResult<Child> {
118 let mut cmd = tokio::process::Command::new(config.rscript);
119
120 cmd.current_dir(config.workdir)
122 .arg("-e")
123 .arg(command.as_ref())
124 .stdin(std::process::Stdio::null())
125 .stdout(std::process::Stdio::piped())
126 .stderr(std::process::Stdio::piped())
127 .env("FAUCET_WORKER_ID", config.worker_id.to_string())
128 .kill_on_drop(true);
130
131 #[cfg(unix)]
132 unsafe {
133 cmd.pre_exec(|| {
134 nix::libc::setpgid(0, 0);
136 Ok(())
137 });
138 }
139
140 cmd.spawn().map_err(Into::into)
141}
142
143fn spawn_plumber_worker(config: WorkerConfig) -> FaucetResult<Child> {
144 let command = format!(
145 r#"
146 options("plumber.port" = {port})
147 plumber::pr_run(plumber::plumb())
148 "#,
149 port = config.addr.port()
150 );
151 let child = spawn_child_rscript_process(config, command)?;
152
153 log_stdio(child, config.target)
154}
155
156fn spawn_shiny_worker(config: WorkerConfig) -> FaucetResult<Child> {
157 let command = format!(
158 r#"
159 options("shiny.port" = {port})
160 shiny::runApp("{app_dir}")
161 "#,
162 port = config.addr.port(),
163 app_dir = config.app_dir.unwrap_or(".")
164 );
165 let child = spawn_child_rscript_process(config, command)?;
166
167 log_stdio(child, config.target)
168}
169
170fn spawn_quarto_shiny_worker(config: WorkerConfig) -> FaucetResult<Child> {
171 let mut cmd = tokio::process::Command::new(config.quarto);
172 cmd.current_dir(config.workdir)
174 .arg("serve")
175 .args(["--port", config.addr.port().to_string().as_str()])
176 .arg(config.qmd.ok_or(FaucetError::MissingArgument("qmd"))?)
177 .stdin(std::process::Stdio::null())
178 .stdout(std::process::Stdio::piped())
179 .stderr(std::process::Stdio::piped())
180 .env("FAUCET_WORKER_ID", config.worker_id.to_string())
181 .kill_on_drop(true);
183
184 #[cfg(unix)]
185 unsafe {
186 cmd.pre_exec(|| {
187 nix::libc::setpgid(0, 0);
189 Ok(())
190 });
191 }
192
193 let child = cmd.spawn()?;
194
195 log_stdio(child, config.target)
196}
197
198impl WorkerConfig {
199 fn spawn_process(self, config: WorkerConfig) -> Child {
200 let child_result = match self.wtype {
201 WorkerType::Plumber => spawn_plumber_worker(config),
202 WorkerType::Shiny => spawn_shiny_worker(config),
203 WorkerType::QuartoShiny => spawn_quarto_shiny_worker(config),
204 };
205 match child_result {
206 Ok(child) => child,
207 Err(e) => {
208 log::error!(target: "faucet", "Failed to invoke R for {target}: {e}", target = config.target);
209 log::error!(target: "faucet", "Exiting...");
210 std::process::exit(1);
211 }
212 }
213 }
214}
215
216pub struct Worker {
217 pub child: WorkerChild,
219 pub config: WorkerConfig,
221}
222
223async fn check_if_online(addr: SocketAddr) -> bool {
224 let stream = tokio::net::TcpStream::connect(addr).await;
225 stream.is_ok()
226}
227
228const RECHECK_INTERVAL: Duration = Duration::from_millis(250);
229
230pub struct WorkerChild {
231 handle: Option<JoinHandle<FaucetResult<()>>>,
232}
233
234impl WorkerChild {
235 pub async fn wait_until_done(&mut self) {
236 if let Some(handle) = self.handle.take() {
237 let _ = handle.await;
238 }
239 }
240}
241
242fn spawn_worker_task(config: WorkerConfig, shutdown: ShutdownSignal) -> WorkerChild {
243 let handle = tokio::spawn(async move {
244 'outer: loop {
245 let mut child = config.spawn_process(config);
246 let pid = child.id().expect("Failed to get plumber worker PID");
247
248 let child_loop = async {
252 log::info!(target: "faucet", "Starting process {pid} for {target} on port {port}", port = config.addr.port(), target = config.target);
253 loop {
254 let check_status = check_if_online(config.addr).await;
256 if check_status {
258 log::info!(target: "faucet", "{target} is online and ready to serve connections at {route}", target = config.target, route = config.worker_route.unwrap_or("/"));
259 config.is_online.store(check_status, Ordering::SeqCst);
260 break;
261 }
262 if child.try_wait()?.is_some() {
265 break;
266 }
267
268 tokio::time::sleep(RECHECK_INTERVAL).await;
269 }
270 FaucetResult::Ok(child.wait().await?)
271 };
272 tokio::select! {
273 _ = shutdown.wait() => {
276 let _ = child.kill().await;
277 log::info!(target: "faucet", "{target}'s process ({pid}) killed", target = config.target);
278 break 'outer;
279 },
280 status = child_loop => {
282 config
283 .is_online
284 .store(false, std::sync::atomic::Ordering::SeqCst);
285 log::error!(target: "faucet", "{target}'s process ({}) exited with status {}", pid, status?, target = config.target);
286 continue 'outer;
287 }
288 }
289 }
290 FaucetResult::Ok(())
291 });
292 WorkerChild {
293 handle: Some(handle),
294 }
295}
296
297impl Worker {
298 pub fn from_config(config: WorkerConfig, shutdown: ShutdownSignal) -> FaucetResult<Self> {
299 let child = spawn_worker_task(config, shutdown);
300 Ok(Self { child, config })
301 }
302}
303
304pub struct Workers {
305 pub workers: Box<[Worker]>,
306}
307
308const TRIES: usize = 20;
309
310impl Workers {
311 pub(crate) async fn new(
312 server_config: FaucetServerConfig,
313 shutdown: ShutdownSignal,
314 ) -> FaucetResult<Self> {
315 let mut workers = Vec::with_capacity(server_config.n_workers.get());
316
317 for id in 0..server_config.n_workers.get() {
318 let socket_addr = get_available_socket(TRIES).await?;
319 let config = WorkerConfig::new(id + 1, socket_addr, &server_config);
320 let worker = Worker::from_config(config, shutdown.clone())?;
321 workers.push(worker);
322 }
323
324 let workers = workers.into_boxed_slice();
325
326 Ok(Self { workers })
327 }
328 pub(crate) fn get_workers_config(&self) -> Vec<WorkerConfig> {
329 self.workers.iter().map(|w| w.config).collect()
330 }
331}