1use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::fs;
6use std::fs::File;
7use std::process::Stdio;
8use std::sync::Arc;
9use std::sync::Mutex;
10
11use anyhow::Context;
12use anyhow::Result;
13use anyhow::bail;
14use crankshaft::engine::service::name::GeneratorIterator;
15use crankshaft::engine::service::name::UniqueAlphanumeric;
16use crankshaft::events::Event;
17use crankshaft::events::next_task_id;
18use crankshaft::events::send_event;
19use futures::FutureExt;
20use futures::future::BoxFuture;
21use nonempty::NonEmpty;
22use tokio::process::Command;
23use tokio::select;
24use tokio::sync::broadcast;
25use tokio_util::sync::CancellationToken;
26use tracing::info;
27use tracing::warn;
28
29use super::TaskExecutionBackend;
30use super::TaskExecutionConstraints;
31use crate::CancellationContext;
32use crate::EvaluationPath;
33use crate::Events;
34use crate::ONE_GIBIBYTE;
35use crate::PrimitiveValue;
36use crate::SYSTEM;
37use crate::TaskInputs;
38use crate::Value;
39use crate::backend::ExecuteTaskRequest;
40use crate::backend::INITIAL_EXPECTED_NAMES;
41use crate::backend::TaskExecutionResult;
42use crate::backend::manager::TaskManager;
43use crate::config::Config;
44use crate::config::DEFAULT_TASK_SHELL;
45use crate::config::TaskResourceLimitBehavior;
46use crate::convert_unit_string;
47use crate::http::Transferer;
48use crate::v1::requirements;
49
50struct LocalTask<'a> {
55 config: Arc<Config>,
57 request: ExecuteTaskRequest<'a>,
59 name: String,
61 events: Option<broadcast::Sender<Event>>,
63 cancellation: CancellationContext,
65}
66
67impl<'a> LocalTask<'a> {
68 async fn run(self) -> Result<Option<TaskExecutionResult>> {
72 let id = next_task_id();
73 let work_dir = self.request.work_dir();
74 let stdout_path = self.request.stdout_path();
75 let stderr_path = self.request.stderr_path();
76
77 let run = async {
78 fs::create_dir_all(&work_dir).with_context(|| {
80 format!(
81 "failed to create directory `{path}`",
82 path = work_dir.display()
83 )
84 })?;
85
86 let command_path = self.request.command_path();
88 fs::write(&command_path, self.request.command).with_context(|| {
89 format!(
90 "failed to write command contents to `{path}`",
91 path = command_path.display()
92 )
93 })?;
94
95 let stdout = File::create(&stdout_path).with_context(|| {
97 format!(
98 "failed to create stdout file `{path}`",
99 path = stdout_path.display()
100 )
101 })?;
102
103 let stderr = File::create(&stderr_path).with_context(|| {
105 format!(
106 "failed to create stderr file `{path}`",
107 path = stderr_path.display()
108 )
109 })?;
110
111 let mut command = Command::new(
112 self.config
113 .task
114 .shell
115 .as_deref()
116 .unwrap_or(DEFAULT_TASK_SHELL),
117 );
118 command
119 .current_dir(&work_dir)
120 .arg(command_path)
121 .stdin(Stdio::null())
122 .stdout(stdout)
123 .stderr(stderr)
124 .envs(
125 self.request
126 .env
127 .iter()
128 .map(|(k, v)| (OsStr::new(k), OsStr::new(v))),
129 )
130 .kill_on_drop(true);
131
132 #[cfg(windows)]
135 if let Ok(path) = std::env::var("PATH") {
136 command.env("PATH", path);
137 }
138
139 let mut child = command.spawn().context("failed to spawn shell")?;
140
141 send_event!(self.events, Event::TaskStarted { id });
143
144 let id = child.id().expect("should have id");
145 info!(
146 "spawned local shell process {id} for execution of task `{name}`",
147 name = self.name
148 );
149
150 let status = child.wait().await.with_context(|| {
151 format!("failed to wait for termination of task child process {id}")
152 })?;
153
154 #[cfg(unix)]
155 {
156 use std::os::unix::process::ExitStatusExt;
157 if let Some(signal) = status.signal() {
158 tracing::warn!("task process {id} has terminated with signal {signal}");
159
160 bail!(
161 "task child process {id} has terminated with signal {signal}; see stderr \
162 file `{path}` for more details",
163 path = stderr_path.display()
164 );
165 }
166 }
167
168 Ok(status)
169 };
170
171 let task_token = CancellationToken::new();
173 send_event!(
174 self.events,
175 Event::TaskCreated {
176 id,
177 name: self.name.clone(),
178 tes_id: None,
179 token: task_token.clone(),
180 }
181 );
182
183 let token = self.cancellation.second();
184
185 select! {
186 biased;
188 _ = task_token.cancelled() => {
189 send_event!(self.events, Event::TaskCanceled { id });
190 Ok(None)
191 }
192 _ = token.cancelled() => {
193 send_event!(self.events, Event::TaskCanceled { id });
194 Ok(None)
195 }
196 result = run => {
197 match result {
198 Ok(status) => {
199 send_event!(self.events, Event::TaskCompleted { id, exit_statuses: NonEmpty::new(status) });
200
201 let exit_code = status.code().expect("process should have exited");
202 info!("process {id} for task `{name}` has terminated with status code {exit_code}", name = self.name);
203 Ok(Some(TaskExecutionResult {
204 exit_code,
205 work_dir: EvaluationPath::from_local_path(work_dir),
206 stdout: PrimitiveValue::new_file(stdout_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
207 stderr: PrimitiveValue::new_file(stderr_path.into_os_string().into_string().expect("path should be UTF-8")).into(),
208 }))
209 }
210 Err(e) => {
211 send_event!(self.events, Event::TaskFailed { id, message: format!("{e:#}") });
212 Err(e)
213 }
214 }
215 }
216 }
217 }
218}
219
220pub struct LocalBackend {
227 config: Arc<Config>,
229 cancellation: CancellationContext,
231 cpu: f64,
233 memory: u64,
235 manager: TaskManager,
237 names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
239 events: Events,
241}
242
243impl LocalBackend {
244 pub fn new(
249 config: Arc<Config>,
250 events: Events,
251 cancellation: CancellationContext,
252 ) -> Result<Self> {
253 info!("initializing local backend");
254
255 let names = Arc::new(Mutex::new(GeneratorIterator::new(
256 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
257 INITIAL_EXPECTED_NAMES,
258 )));
259
260 let backend_config = config.backend()?;
261 let backend_config = backend_config
262 .as_local()
263 .context("configured backend is not local")?;
264 let cpu = backend_config
265 .cpu
266 .map(|v| v as f64)
267 .unwrap_or_else(|| SYSTEM.cpus().len() as f64);
268 let memory = backend_config
269 .memory
270 .as_ref()
271 .map(|s| convert_unit_string(s).expect("value should be valid"))
272 .unwrap_or_else(|| SYSTEM.total_memory());
273 let manager = TaskManager::new(
274 cpu,
275 cpu,
276 memory,
277 memory,
278 events.clone(),
279 cancellation.clone(),
280 );
281
282 Ok(Self {
283 config,
284 cancellation,
285 cpu,
286 memory,
287 manager,
288 names,
289 events,
290 })
291 }
292}
293
294impl TaskExecutionBackend for LocalBackend {
295 fn constraints(
296 &self,
297 inputs: &TaskInputs,
298 requirements: &HashMap<String, Value>,
299 _: &HashMap<String, Value>,
300 ) -> Result<TaskExecutionConstraints> {
301 let mut cpu = requirements::cpu(inputs, requirements);
302 if self.cpu < cpu {
303 let env_specific = if self.config.suppress_env_specific_output {
304 String::new()
305 } else {
306 format!(
307 ", but the host only has {total_cpu} available",
308 total_cpu = self.cpu
309 )
310 };
311 match self.config.task.cpu_limit_behavior {
312 TaskResourceLimitBehavior::TryWithMax => {
313 warn!(
314 "task requires at least {cpu} CPU{s}{env_specific}",
315 s = if cpu == 1.0 { "" } else { "s" },
316 );
317 cpu = self.cpu;
319 }
320 TaskResourceLimitBehavior::Deny => {
321 bail!(
322 "task requires at least {cpu} CPU{s}{env_specific}",
323 s = if cpu == 1.0 { "" } else { "s" },
324 );
325 }
326 }
327 }
328
329 let mut memory = requirements::memory(inputs, requirements)? as u64;
330 if self.memory < memory as u64 {
331 let env_specific = if self.config.suppress_env_specific_output {
332 String::new()
333 } else {
334 format!(
335 ", but the host only has {total_memory} GiB available",
336 total_memory = self.memory as f64 / ONE_GIBIBYTE,
337 )
338 };
339 match self.config.task.memory_limit_behavior {
340 TaskResourceLimitBehavior::TryWithMax => {
341 warn!(
342 "task requires at least {memory} GiB of memory{env_specific}",
343 memory = memory as f64 / ONE_GIBIBYTE,
345 );
346 memory = self.memory;
348 }
349 TaskResourceLimitBehavior::Deny => {
350 bail!(
351 "task requires at least {memory} GiB of memory{env_specific}",
352 memory = memory as f64 / ONE_GIBIBYTE,
354 );
355 }
356 }
357 }
358
359 Ok(TaskExecutionConstraints {
360 container: None,
361 cpu,
362 memory,
363 gpu: Default::default(),
364 fpga: Default::default(),
365 disks: Default::default(),
366 })
367 }
368
369 fn guest_inputs_dir(&self) -> Option<&'static str> {
370 None
372 }
373
374 fn execute<'a>(
375 &'a self,
376 _: &'a Arc<dyn Transferer>,
377 request: ExecuteTaskRequest<'a>,
378 ) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>> {
379 async move {
380 let name = format!(
381 "{id}-{generated}",
382 id = request.id,
383 generated = self
384 .names
385 .lock()
386 .expect("generator should always acquire")
387 .next()
388 .expect("generator should never be exhausted")
389 );
390
391 let cpu = request.constraints.cpu;
392 let memory = request.constraints.memory;
393
394 let task = LocalTask {
395 config: self.config.clone(),
396 request,
397 name,
398 events: self.events.crankshaft().clone(),
399 cancellation: self.cancellation.clone(),
400 };
401
402 self.manager.run(cpu, memory, task.run()).await
403 }
404 .boxed()
405 }
406}