1use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use crankshaft::config::backend;
13use crankshaft::engine::Task;
14use crankshaft::engine::service::name::GeneratorIterator;
15use crankshaft::engine::service::name::UniqueAlphanumeric;
16use crankshaft::engine::service::runner::Backend;
17use crankshaft::engine::service::runner::backend::docker;
18use crankshaft::engine::task::Execution;
19use crankshaft::engine::task::Input;
20use crankshaft::engine::task::Output;
21use crankshaft::engine::task::Resources;
22use crankshaft::engine::task::input::Contents;
23use crankshaft::engine::task::input::Type as InputType;
24use crankshaft::engine::task::output::Type as OutputType;
25use crankshaft::events::Event;
26use futures::future::BoxFuture;
27use nonempty::NonEmpty;
28use tokio::sync::broadcast;
29use tokio::sync::oneshot;
30use tokio::sync::oneshot::Receiver;
31use tokio_util::sync::CancellationToken;
32use tracing::info;
33use tracing::warn;
34use url::Url;
35
36use super::TaskExecutionBackend;
37use super::TaskExecutionConstraints;
38use super::TaskExecutionResult;
39use super::TaskManager;
40use super::TaskManagerRequest;
41use super::TaskSpawnRequest;
42use crate::COMMAND_FILE_NAME;
43use crate::ONE_GIBIBYTE;
44use crate::PrimitiveValue;
45use crate::STDERR_FILE_NAME;
46use crate::STDOUT_FILE_NAME;
47use crate::Value;
48use crate::WORK_DIR_NAME;
49use crate::backend::INITIAL_EXPECTED_NAMES;
50use crate::config::Config;
51use crate::config::DEFAULT_TASK_SHELL;
52use crate::config::DockerBackendConfig;
53use crate::config::TaskResourceLimitBehavior;
54use crate::path::EvaluationPath;
55use crate::v1::container;
56use crate::v1::cpu;
57use crate::v1::max_cpu;
58use crate::v1::max_memory;
59use crate::v1::memory;
60
61const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
63
64const GUEST_WORK_DIR: &str = "/mnt/task/work";
66
67const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
69
70const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
72
73const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
75
76#[derive(Debug)]
79struct DockerTaskRequest {
80 config: Arc<Config>,
82 inner: TaskSpawnRequest,
84 backend: Arc<docker::Backend>,
86 name: String,
88 container: String,
90 cpu: f64,
92 memory: u64,
94 max_cpu: Option<f64>,
96 max_memory: Option<u64>,
98 token: CancellationToken,
100}
101
102impl TaskManagerRequest for DockerTaskRequest {
103 fn cpu(&self) -> f64 {
104 self.cpu
105 }
106
107 fn memory(&self) -> u64 {
108 self.memory
109 }
110
111 async fn run(self) -> Result<TaskExecutionResult> {
112 let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
114 fs::create_dir_all(&work_dir).with_context(|| {
115 format!(
116 "failed to create directory `{path}`",
117 path = work_dir.display()
118 )
119 })?;
120
121 let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
124 fs::write(&command_path, self.inner.command()).with_context(|| {
125 format!(
126 "failed to write command contents to `{path}`",
127 path = command_path.display()
128 )
129 })?;
130
131 let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
134 for input in self.inner.inputs().iter() {
135 let guest_path = input.guest_path().expect("input should have guest path");
136 let local_path = input.local_path().expect("input should be localized");
137
138 if !local_path.exists() {
140 bail!(
141 "cannot mount input `{path}` as it does not exist",
142 path = local_path.display()
143 );
144 }
145
146 inputs.push(
147 Input::builder()
148 .path(guest_path.as_str())
149 .contents(Contents::Path(local_path.into()))
150 .ty(input.kind())
151 .read_only(true)
152 .build(),
153 );
154 }
155
156 inputs.push(
158 Input::builder()
159 .path(GUEST_WORK_DIR)
160 .contents(Contents::Path(work_dir.to_path_buf()))
161 .ty(InputType::Directory)
162 .read_only(false)
163 .build(),
164 );
165
166 inputs.push(
168 Input::builder()
169 .path(GUEST_COMMAND_PATH)
170 .contents(Contents::Path(command_path.to_path_buf()))
171 .ty(InputType::File)
172 .read_only(true)
173 .build(),
174 );
175
176 let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
177 let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
178
179 let outputs = vec![
180 Output::builder()
181 .path(GUEST_STDOUT_PATH)
182 .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
183 .ty(OutputType::File)
184 .build(),
185 Output::builder()
186 .path(GUEST_STDERR_PATH)
187 .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
188 .ty(OutputType::File)
189 .build(),
190 ];
191
192 let task = Task::builder()
193 .name(self.name)
194 .executions(NonEmpty::new(
195 Execution::builder()
196 .image(self.container)
197 .program(
198 self.config
199 .task
200 .shell
201 .as_deref()
202 .unwrap_or(DEFAULT_TASK_SHELL),
203 )
204 .args([GUEST_COMMAND_PATH.to_string()])
205 .work_dir(GUEST_WORK_DIR)
206 .env(self.inner.env().clone())
207 .stdout(GUEST_STDOUT_PATH)
208 .stderr(GUEST_STDERR_PATH)
209 .build(),
210 ))
211 .inputs(inputs)
212 .outputs(outputs)
213 .resources(
214 Resources::builder()
215 .cpu(self.cpu)
216 .maybe_cpu_limit(self.max_cpu)
217 .ram(self.memory as f64 / ONE_GIBIBYTE)
218 .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
219 .build(),
220 )
221 .build();
222
223 let statuses = self.backend.run(task, self.token.clone())?.await?;
224
225 assert_eq!(statuses.len(), 1, "there should only be one exit status");
226 let status = statuses.first();
227
228 Ok(TaskExecutionResult {
229 exit_code: status.code().expect("should have exit code"),
230 work_dir: EvaluationPath::Local(work_dir),
231 stdout: PrimitiveValue::new_file(
232 stdout_path
233 .into_os_string()
234 .into_string()
235 .expect("path should be UTF-8"),
236 )
237 .into(),
238 stderr: PrimitiveValue::new_file(
239 stderr_path
240 .into_os_string()
241 .into_string()
242 .expect("path should be UTF-8"),
243 )
244 .into(),
245 })
246 }
247}
248
249pub struct DockerBackend {
251 config: Arc<Config>,
253 inner: Arc<docker::Backend>,
255 max_concurrency: u64,
257 max_cpu: u64,
259 max_memory: u64,
261 manager: TaskManager<DockerTaskRequest>,
263 names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
265}
266
267impl DockerBackend {
268 pub async fn new(
273 config: Arc<Config>,
274 backend_config: &DockerBackendConfig,
275 events: Option<broadcast::Sender<Event>>,
276 ) -> Result<Self> {
277 info!("initializing Docker backend");
278
279 let names = Arc::new(Mutex::new(GeneratorIterator::new(
280 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
281 INITIAL_EXPECTED_NAMES,
282 )));
283
284 let backend = docker::Backend::initialize_default_with(
285 backend::docker::Config::builder()
286 .cleanup(backend_config.cleanup)
287 .build(),
288 names.clone(),
289 events,
290 )
291 .await
292 .context("failed to initialize Docker backend")?;
293
294 let resources = *backend.resources();
295 let cpu = resources.cpu();
296 let max_cpu = resources.max_cpu();
297 let memory = resources.memory();
298 let max_memory = resources.max_memory();
299
300 let manager = if resources.use_service() {
304 TaskManager::new_unlimited(max_cpu, max_memory)
305 } else {
306 TaskManager::new(cpu, max_cpu, memory, max_memory)
307 };
308
309 Ok(Self {
310 config,
311 inner: Arc::new(backend),
312 max_concurrency: cpu,
313 max_cpu,
314 max_memory,
315 manager,
316 names,
317 })
318 }
319}
320
321impl TaskExecutionBackend for DockerBackend {
322 fn max_concurrency(&self) -> u64 {
323 self.max_concurrency
324 }
325
326 fn constraints(
327 &self,
328 requirements: &HashMap<String, Value>,
329 _: &HashMap<String, Value>,
330 ) -> Result<TaskExecutionConstraints> {
331 let container = container(requirements, self.config.task.container.as_deref());
332
333 let mut cpu = cpu(requirements);
334 if (self.max_cpu as f64) < cpu {
335 let env_specific = if self.config.suppress_env_specific_output {
336 String::new()
337 } else {
338 format!(
339 ", but the execution backend has a maximum of {max_cpu}",
340 max_cpu = self.max_cpu,
341 )
342 };
343 match self.config.task.cpu_limit_behavior {
344 TaskResourceLimitBehavior::TryWithMax => {
345 warn!(
346 "task requires at least {cpu} CPU{s}{env_specific}",
347 s = if cpu == 1.0 { "" } else { "s" },
348 );
349 cpu = self.max_cpu as f64;
351 }
352 TaskResourceLimitBehavior::Deny => {
353 bail!(
354 "task requires at least {cpu} CPU{s}{env_specific}",
355 s = if cpu == 1.0 { "" } else { "s" },
356 );
357 }
358 }
359 }
360
361 let mut memory = memory(requirements)?;
362 if self.max_memory < memory as u64 {
363 let env_specific = if self.config.suppress_env_specific_output {
364 String::new()
365 } else {
366 format!(
367 ", but the execution backend has a maximum of {max_memory} GiB",
368 max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
369 )
370 };
371 match self.config.task.memory_limit_behavior {
372 TaskResourceLimitBehavior::TryWithMax => {
373 warn!(
374 "task requires at least {memory} GiB of memory{env_specific}",
375 memory = memory as f64 / ONE_GIBIBYTE,
377 );
378 memory = self.max_memory.try_into().unwrap_or(i64::MAX);
380 }
381 TaskResourceLimitBehavior::Deny => {
382 bail!(
383 "task requires at least {memory} GiB of memory{env_specific}",
384 memory = memory as f64 / ONE_GIBIBYTE,
386 );
387 }
388 }
389 }
390
391 Ok(TaskExecutionConstraints {
392 container: Some(container.into_owned()),
393 cpu,
394 memory,
395 gpu: Default::default(),
396 fpga: Default::default(),
397 disks: Default::default(),
398 })
399 }
400
401 fn guest_inputs_dir(&self) -> Option<&'static str> {
402 Some(GUEST_INPUTS_DIR)
403 }
404
405 fn needs_local_inputs(&self) -> bool {
406 true
407 }
408
409 fn spawn(
410 &self,
411 request: TaskSpawnRequest,
412 token: CancellationToken,
413 ) -> Result<Receiver<Result<TaskExecutionResult>>> {
414 let (completed_tx, completed_rx) = oneshot::channel();
415
416 let requirements = request.requirements();
417 let hints = request.hints();
418
419 let container = container(requirements, self.config.task.container.as_deref()).into_owned();
420 let mut cpu = cpu(requirements);
421 if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
422 cpu = std::cmp::min(cpu.ceil() as u64, self.max_cpu) as f64;
423 }
424 let mut memory = memory(requirements)? as u64;
425 if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
426 memory = std::cmp::min(memory, self.max_memory);
427 }
428 let max_cpu = max_cpu(hints);
429 let max_memory = max_memory(hints)?.map(|i| i as u64);
430
431 let name = format!(
432 "{id}-{generated}",
433 id = request.id(),
434 generated = self
435 .names
436 .lock()
437 .expect("generator should always acquire")
438 .next()
439 .expect("generator should never be exhausted")
440 );
441 self.manager.send(
442 DockerTaskRequest {
443 config: self.config.clone(),
444 inner: request,
445 backend: self.inner.clone(),
446 name,
447 container,
448 cpu,
449 memory,
450 max_cpu,
451 max_memory,
452 token,
453 },
454 completed_tx,
455 );
456
457 Ok(completed_rx)
458 }
459
460 #[cfg(unix)]
461 fn cleanup<'a, 'b, 'c>(
462 &'a self,
463 output_dir: &'b Path,
464 token: CancellationToken,
465 ) -> Option<BoxFuture<'c, ()>>
466 where
467 'a: 'c,
468 'b: 'c,
469 Self: 'c,
470 {
471 use anyhow::anyhow;
472 use futures::FutureExt;
473
474 const GUEST_OUT_DIR: &str = "/workflow_output";
476
477 const CLEANUP_CPU: f64 = 0.1;
479
480 const CLEANUP_MEMORY: f64 = 0.05;
482
483 let backend = self.inner.clone();
484 let names = self.names.clone();
485 let output_path = std::path::absolute(output_dir).expect("failed to get absolute path");
486 if !output_path.is_dir() {
487 info!("output directory does not exist: skipping cleanup");
488 return None;
489 }
490
491 Some(
492 async move {
493 let result = async {
494 let (uid, gid) = unsafe { (libc::getuid(), libc::getgid()) };
495 let ownership = format!("{uid}:{gid}");
496 let output_mount = Input::builder()
497 .path(GUEST_OUT_DIR)
498 .contents(Contents::Path(output_path.clone()))
499 .ty(InputType::Directory)
500 .read_only(false)
502 .build();
503
504 let name = format!(
505 "docker-backend-cleanup-{id}",
506 id = names
507 .lock()
508 .expect("generator should always acquire")
509 .next()
510 .expect("generator should never be exhausted")
511 );
512
513 let task = Task::builder()
514 .name(&name)
515 .executions(NonEmpty::new(
516 Execution::builder()
517 .image("alpine:latest")
518 .program("chown")
519 .args([
520 "-R".to_string(),
521 ownership.clone(),
522 GUEST_OUT_DIR.to_string(),
523 ])
524 .work_dir("/")
525 .build(),
526 ))
527 .inputs([output_mount])
528 .resources(
529 Resources::builder()
530 .cpu(CLEANUP_CPU)
531 .ram(CLEANUP_MEMORY)
532 .build(),
533 )
534 .build();
535
536 info!(
537 "running cleanup task `{name}` to change ownership of `{path}` to \
538 `{ownership}`",
539 path = output_path.display(),
540 );
541
542 let output_rx = backend
543 .run(task, token)
544 .map_err(|e| anyhow!("failed to submit cleanup task: {e}"))?;
545
546 let statuses = output_rx
547 .await
548 .map_err(|e| anyhow!("failed to run cleanup task: {e}"))?;
549 let status = statuses.first();
550 if status.success() {
551 Ok(())
552 } else {
553 bail!(
554 "failed to chown output directory `{path}`",
555 path = output_path.display()
556 );
557 }
558 }
559 .await;
560
561 if let Err(e) = result {
562 tracing::error!("cleanup task failed: {e:#}");
563 }
564 }
565 .boxed(),
566 )
567 }
568
569 #[cfg(not(unix))]
570 fn cleanup<'a, 'b, 'c>(&'a self, _: &'b Path, _: CancellationToken) -> Option<BoxFuture<'c, ()>>
571 where
572 'a: 'c,
573 'b: 'c,
574 Self: 'c,
575 {
576 tracing::debug!("cleanup task is not supported on this platform");
577 None
578 }
579}