1use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs;
6use std::fs::File;
7use std::process::Stdio;
8use std::sync::Arc;
9use std::sync::Mutex;
10
11use anyhow::Context;
12use anyhow::Result;
13use anyhow::bail;
14use crankshaft::engine::service::name::GeneratorIterator;
15use crankshaft::engine::service::name::UniqueAlphanumeric;
16use crankshaft::events::Event;
17use crankshaft::events::next_task_id;
18use crankshaft::events::send_event;
19use nonempty::NonEmpty;
20use tokio::process::Command;
21use tokio::select;
22use tokio::sync::broadcast;
23use tokio::sync::oneshot;
24use tokio::sync::oneshot::Receiver;
25use tokio_util::sync::CancellationToken;
26use tracing::info;
27use tracing::warn;
28
29use super::TaskExecutionBackend;
30use super::TaskExecutionConstraints;
31use super::TaskManager;
32use super::TaskManagerRequest;
33use super::TaskSpawnRequest;
34use crate::COMMAND_FILE_NAME;
35use crate::ONE_GIBIBYTE;
36use crate::PrimitiveValue;
37use crate::STDERR_FILE_NAME;
38use crate::STDOUT_FILE_NAME;
39use crate::SYSTEM;
40use crate::TaskExecutionResult;
41use crate::Value;
42use crate::WORK_DIR_NAME;
43use crate::backend::INITIAL_EXPECTED_NAMES;
44use crate::config::Config;
45use crate::config::DEFAULT_TASK_SHELL;
46use crate::config::LocalBackendConfig;
47use crate::config::TaskResourceLimitBehavior;
48use crate::convert_unit_string;
49use crate::path::EvaluationPath;
50use crate::v1::cpu;
51use crate::v1::memory;
52
53#[derive(Debug)]
58struct LocalTaskRequest {
59 config: Arc<Config>,
61 inner: TaskSpawnRequest,
63 name: String,
65 cpu: f64,
69 memory: u64,
73 token: CancellationToken,
75 events: Option<broadcast::Sender<Event>>,
77}
78
79impl TaskManagerRequest for LocalTaskRequest {
80 fn cpu(&self) -> f64 {
81 self.cpu
82 }
83
84 fn memory(&self) -> u64 {
85 self.memory
86 }
87
88 async fn run(self) -> Result<TaskExecutionResult> {
89 let id = next_task_id();
90 let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
91 let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
92 let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
93
94 let run = async {
95 fs::create_dir_all(&work_dir).with_context(|| {
97 format!(
98 "failed to create directory `{path}`",
99 path = work_dir.display()
100 )
101 })?;
102
103 let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
105 fs::write(&command_path, self.inner.command()).with_context(|| {
106 format!(
107 "failed to write command contents to `{path}`",
108 path = command_path.display()
109 )
110 })?;
111
112 let stdout = File::create(&stdout_path).with_context(|| {
114 format!(
115 "failed to create stdout file `{path}`",
116 path = stdout_path.display()
117 )
118 })?;
119
120 let stderr = File::create(&stderr_path).with_context(|| {
122 format!(
123 "failed to create stderr file `{path}`",
124 path = stderr_path.display()
125 )
126 })?;
127
128 let mut command = Command::new(
129 self.config
130 .task
131 .shell
132 .as_deref()
133 .unwrap_or(DEFAULT_TASK_SHELL),
134 );
135 command
136 .current_dir(&work_dir)
137 .arg(command_path)
138 .stdin(Stdio::null())
139 .stdout(stdout)
140 .stderr(stderr)
141 .envs(
142 self.inner
143 .env()
144 .iter()
145 .map(|(k, v)| (OsStr::new(k), OsStr::new(v))),
146 )
147 .kill_on_drop(true);
148
149 #[cfg(windows)]
152 if let Ok(path) = std::env::var("PATH") {
153 command.env("PATH", path);
154 }
155
156 let mut child = command.spawn().context("failed to spawn shell")?;
157
158 send_event!(self.events, Event::TaskStarted { id });
160
161 let id = child.id().expect("should have id");
162 info!(
163 "spawned local shell process {id} for execution of task `{name}`",
164 name = self.name
165 );
166
167 let status = child.wait().await.with_context(|| {
168 format!("failed to wait for termination of task child process {id}")
169 })?;
170
171 #[cfg(unix)]
172 {
173 use std::os::unix::process::ExitStatusExt;
174 if let Some(signal) = status.signal() {
175 tracing::warn!("task process {id} has terminated with signal {signal}");
176
177 bail!(
178 "task child process {id} has terminated with signal {signal}; see stderr \
179 file `{path}` for more details",
180 path = stderr_path.display()
181 );
182 }
183 }
184
185 Ok(status)
186 };
187
188 send_event!(
190 self.events,
191 Event::TaskCreated {
192 id,
193 name: self.name.clone(),
194 tes_id: None,
195 token: self.token.clone(),
196 }
197 );
198
199 select! {
200 biased;
202
203 _ = self.token.cancelled() => {
204 send_event!(self.events, Event::TaskCanceled { id });
205 bail!("task was cancelled");
206 }
207 result = run => {
208 match result {
209 Ok(status) => {
210 send_event!(self.events, Event::TaskCompleted { id, exit_statuses: NonEmpty::new(status) });
211
212 let exit_code = status.code().expect("process should have exited");
213 info!("process {id} for task `{name}` has terminated with status code {exit_code}", name = self.name);
214 Ok(TaskExecutionResult {
215 exit_code,
216 work_dir: EvaluationPath::Local(work_dir),
217 stdout: PrimitiveValue::new_file(stdout_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
218 stderr: PrimitiveValue::new_file(stderr_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
219 })
220 }
221 Err(e) => {
222 send_event!(self.events, Event::TaskFailed { id, message: format!("{e:#}") });
223 Err(e)
224 }
225 }
226 }
227 }
228 }
229}
230
231pub struct LocalBackend {
238 config: Arc<Config>,
240 cpu: u64,
242 memory: u64,
244 manager: TaskManager<LocalTaskRequest>,
246 names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
248 events: Option<broadcast::Sender<Event>>,
250}
251
252impl LocalBackend {
253 pub fn new(
258 config: Arc<Config>,
259 backend_config: &LocalBackendConfig,
260 events: Option<broadcast::Sender<Event>>,
261 ) -> Result<Self> {
262 info!("initializing local backend");
263
264 let names = Arc::new(Mutex::new(GeneratorIterator::new(
265 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
266 INITIAL_EXPECTED_NAMES,
267 )));
268
269 let cpu = backend_config
270 .cpu
271 .unwrap_or_else(|| SYSTEM.cpus().len() as u64);
272 let memory = backend_config
273 .memory
274 .as_ref()
275 .map(|s| convert_unit_string(s).expect("value should be valid"))
276 .unwrap_or_else(|| SYSTEM.total_memory());
277 let manager = TaskManager::new(cpu, cpu, memory, memory);
278
279 Ok(Self {
280 config,
281 cpu,
282 memory,
283 manager,
284 names,
285 events,
286 })
287 }
288}
289
290impl TaskExecutionBackend for LocalBackend {
291 fn max_concurrency(&self) -> u64 {
292 self.cpu
293 }
294
295 fn constraints(
296 &self,
297 requirements: &HashMap<String, Value>,
298 _: &HashMap<String, Value>,
299 ) -> Result<TaskExecutionConstraints> {
300 let mut cpu = cpu(requirements);
301 if (self.cpu as f64) < cpu {
302 let env_specific = if self.config.suppress_env_specific_output {
303 String::new()
304 } else {
305 format!(
306 ", but the host only has {total_cpu} available",
307 total_cpu = self.cpu
308 )
309 };
310 match self.config.task.cpu_limit_behavior {
311 TaskResourceLimitBehavior::TryWithMax => {
312 warn!(
313 "task requires at least {cpu} CPU{s}{env_specific}",
314 s = if cpu == 1.0 { "" } else { "s" },
315 );
316 cpu = self.cpu as f64;
318 }
319 TaskResourceLimitBehavior::Deny => {
320 bail!(
321 "task requires at least {cpu} CPU{s}{env_specific}",
322 s = if cpu == 1.0 { "" } else { "s" },
323 );
324 }
325 }
326 }
327
328 let mut memory = memory(requirements)?;
329 if self.memory < memory as u64 {
330 let env_specific = if self.config.suppress_env_specific_output {
331 String::new()
332 } else {
333 format!(
334 ", but the host only has {total_memory} GiB available",
335 total_memory = self.memory as f64 / ONE_GIBIBYTE,
336 )
337 };
338 match self.config.task.memory_limit_behavior {
339 TaskResourceLimitBehavior::TryWithMax => {
340 warn!(
341 "task requires at least {memory} GiB of memory{env_specific}",
342 memory = memory as f64 / ONE_GIBIBYTE,
344 );
345 memory = self.memory.try_into().unwrap_or(i64::MAX);
347 }
348 TaskResourceLimitBehavior::Deny => {
349 bail!(
350 "task requires at least {memory} GiB of memory{env_specific}",
351 memory = memory as f64 / ONE_GIBIBYTE,
353 );
354 }
355 }
356 }
357
358 Ok(TaskExecutionConstraints {
359 container: None,
360 cpu,
361 memory,
362 gpu: Default::default(),
363 fpga: Default::default(),
364 disks: Default::default(),
365 })
366 }
367
368 fn guest_inputs_dir(&self) -> Option<&'static str> {
369 None
371 }
372
373 fn needs_local_inputs(&self) -> bool {
374 true
375 }
376
377 fn spawn(
378 &self,
379 request: TaskSpawnRequest,
380 token: CancellationToken,
381 ) -> Result<Receiver<Result<TaskExecutionResult>>> {
382 let (completed_tx, completed_rx) = oneshot::channel();
383
384 let requirements = request.requirements();
385 let mut cpu = cpu(requirements);
386 if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
387 cpu = std::cmp::min(cpu.ceil() as u64, self.cpu) as f64;
388 }
389 let mut memory = memory(requirements)? as u64;
390 if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
391 memory = std::cmp::min(memory, self.memory);
392 }
393
394 let name = format!(
395 "{id}-{generated}",
396 id = request.id(),
397 generated = self
398 .names
399 .lock()
400 .expect("generator should always acquire")
401 .next()
402 .expect("generator should never be exhausted")
403 );
404
405 self.manager.send(
406 LocalTaskRequest {
407 config: self.config.clone(),
408 inner: request,
409 name,
410 cpu,
411 memory,
412 token,
413 events: self.events.clone(),
414 },
415 completed_tx,
416 );
417
418 Ok(completed_rx)
419 }
420}