1use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use crankshaft::config::backend;
14use crankshaft::engine::Task;
15use crankshaft::engine::service::name::GeneratorIterator;
16use crankshaft::engine::service::name::UniqueAlphanumeric;
17use crankshaft::engine::service::runner::Backend;
18use crankshaft::engine::service::runner::backend::docker;
19use crankshaft::engine::task::Execution;
20use crankshaft::engine::task::Input;
21use crankshaft::engine::task::Output;
22use crankshaft::engine::task::Resources;
23use crankshaft::engine::task::input::Contents;
24use crankshaft::engine::task::input::Type as InputType;
25use crankshaft::engine::task::output::Type as OutputType;
26use futures::FutureExt;
27use futures::future::BoxFuture;
28use nonempty::NonEmpty;
29use tokio::sync::oneshot;
30use tokio::task::JoinSet;
31use tokio_util::sync::CancellationToken;
32use tracing::info;
33use tracing::warn;
34use url::Url;
35
36use super::TaskExecutionBackend;
37use super::TaskExecutionConstraints;
38use super::TaskExecutionEvents;
39use super::TaskExecutionResult;
40use super::TaskManager;
41use super::TaskManagerRequest;
42use super::TaskSpawnRequest;
43use crate::COMMAND_FILE_NAME;
44use crate::InputTrie;
45use crate::ONE_GIBIBYTE;
46use crate::PrimitiveValue;
47use crate::STDERR_FILE_NAME;
48use crate::STDOUT_FILE_NAME;
49use crate::Value;
50use crate::WORK_DIR_NAME;
51use crate::config::Config;
52use crate::config::DEFAULT_TASK_SHELL;
53use crate::config::DockerBackendConfig;
54use crate::config::TaskResourceLimitBehavior;
55use crate::http::Downloader;
56use crate::http::HttpDownloader;
57use crate::http::Location;
58use crate::path::EvaluationPath;
59use crate::v1::container;
60use crate::v1::cpu;
61use crate::v1::max_cpu;
62use crate::v1::max_memory;
63use crate::v1::memory;
64
65const INITIAL_EXPECTED_NAMES: usize = 1000;
70
71const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs";
73
74const GUEST_WORK_DIR: &str = "/mnt/task/work";
76
77const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
79
80const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
82
83const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
85
86#[derive(Debug)]
89struct DockerTaskRequest {
90 config: Arc<Config>,
92 inner: TaskSpawnRequest,
94 backend: Arc<docker::Backend>,
96 name: String,
98 container: String,
100 cpu: f64,
102 memory: u64,
104 max_cpu: Option<f64>,
106 max_memory: Option<u64>,
108 token: CancellationToken,
110}
111
112impl TaskManagerRequest for DockerTaskRequest {
113 fn cpu(&self) -> f64 {
114 self.cpu
115 }
116
117 fn memory(&self) -> u64 {
118 self.memory
119 }
120
121 async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
122 let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
124 fs::create_dir_all(&work_dir).with_context(|| {
125 format!(
126 "failed to create directory `{path}`",
127 path = work_dir.display()
128 )
129 })?;
130
131 let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
134 fs::write(&command_path, self.inner.command()).with_context(|| {
135 format!(
136 "failed to write command contents to `{path}`",
137 path = command_path.display()
138 )
139 })?;
140
141 let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
144 for input in self.inner.inputs().iter() {
145 if let Some(guest_path) = input.guest_path() {
146 let location = input.location().expect("all inputs should have localized");
147
148 if location.exists() {
149 inputs.push(
150 Input::builder()
151 .path(guest_path)
152 .contents(Contents::Path(location.into()))
153 .ty(input.kind())
154 .read_only(true)
155 .build(),
156 );
157 }
158 }
159 }
160
161 inputs.push(
163 Input::builder()
164 .path(GUEST_WORK_DIR)
165 .contents(Contents::Path(work_dir.to_path_buf()))
166 .ty(InputType::Directory)
167 .read_only(false)
168 .build(),
169 );
170
171 inputs.push(
173 Input::builder()
174 .path(GUEST_COMMAND_PATH)
175 .contents(Contents::Path(command_path.to_path_buf()))
176 .ty(InputType::File)
177 .read_only(true)
178 .build(),
179 );
180
181 let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
182 let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
183
184 let outputs = vec![
185 Output::builder()
186 .path(GUEST_STDOUT_PATH)
187 .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
188 .ty(OutputType::File)
189 .build(),
190 Output::builder()
191 .path(GUEST_STDERR_PATH)
192 .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
193 .ty(OutputType::File)
194 .build(),
195 ];
196
197 let task = Task::builder()
198 .name(self.name)
199 .executions(NonEmpty::new(
200 Execution::builder()
201 .image(&self.container)
202 .program(
203 self.config
204 .task
205 .shell
206 .as_deref()
207 .unwrap_or(DEFAULT_TASK_SHELL),
208 )
209 .args([GUEST_COMMAND_PATH.to_string()])
210 .work_dir(GUEST_WORK_DIR)
211 .env({
212 let mut final_env = indexmap::IndexMap::new();
213 for (k, v) in self.inner.env() {
214 let guest_path = self
215 .inner
216 .inputs()
217 .iter()
218 .find(|input| input.path().to_str() == Some(v))
219 .and_then(|input| input.guest_path());
220
221 final_env.insert(k.clone(), guest_path.unwrap_or(v).to_string());
222 }
223 final_env
224 })
225 .stdout(GUEST_STDOUT_PATH)
226 .stderr(GUEST_STDERR_PATH)
227 .build(),
228 ))
229 .inputs(inputs)
230 .outputs(outputs)
231 .resources(
232 Resources::builder()
233 .cpu(self.cpu)
234 .maybe_cpu_limit(self.max_cpu)
235 .ram(self.memory as f64 / ONE_GIBIBYTE)
236 .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
237 .build(),
238 )
239 .build();
240
241 let statuses = self
242 .backend
243 .run(task, Some(spawned), self.token.clone())
244 .map_err(|e| anyhow!("{e:#}"))?
245 .await
246 .map_err(|e| anyhow!("{e:#}"))?;
247
248 assert_eq!(statuses.len(), 1, "there should only be one exit status");
249 let status = statuses.first();
250
251 Ok(TaskExecutionResult {
252 inputs: self.inner.info.inputs,
253 exit_code: status.code().expect("should have exit code"),
254 work_dir: EvaluationPath::Local(work_dir),
255 stdout: PrimitiveValue::new_file(
256 stdout_path
257 .into_os_string()
258 .into_string()
259 .expect("path should be UTF-8"),
260 )
261 .into(),
262 stderr: PrimitiveValue::new_file(
263 stderr_path
264 .into_os_string()
265 .into_string()
266 .expect("path should be UTF-8"),
267 )
268 .into(),
269 })
270 }
271}
272
273pub struct DockerBackend {
275 config: Arc<Config>,
277 inner: Arc<docker::Backend>,
279 max_concurrency: u64,
281 max_cpu: u64,
283 max_memory: u64,
285 manager: TaskManager<DockerTaskRequest>,
287 generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
289}
290
291impl DockerBackend {
292 pub async fn new(config: Arc<Config>, backend_config: &DockerBackendConfig) -> Result<Self> {
297 info!("initializing Docker backend");
298
299 let backend = docker::Backend::initialize_default_with(
300 backend::docker::Config::builder()
301 .cleanup(backend_config.cleanup)
302 .build(),
303 )
304 .await
305 .map_err(|e| anyhow!("{e:#}"))
306 .context("failed to initialize Docker backend")?;
307
308 let resources = *backend.resources();
309 let cpu = resources.cpu();
310 let max_cpu = resources.max_cpu();
311 let memory = resources.memory();
312 let max_memory = resources.max_memory();
313
314 let manager = if resources.use_service() {
318 TaskManager::new_unlimited(max_cpu, max_memory)
319 } else {
320 TaskManager::new(cpu, max_cpu, memory, max_memory)
321 };
322
323 Ok(Self {
324 config,
325 inner: Arc::new(backend),
326 max_concurrency: cpu,
327 max_cpu,
328 max_memory,
329 manager,
330 generator: Arc::new(Mutex::new(GeneratorIterator::new(
331 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
332 INITIAL_EXPECTED_NAMES,
333 ))),
334 })
335 }
336}
337
338impl TaskExecutionBackend for DockerBackend {
339 fn max_concurrency(&self) -> u64 {
340 self.max_concurrency
341 }
342
343 fn constraints(
344 &self,
345 requirements: &HashMap<String, Value>,
346 _: &HashMap<String, Value>,
347 ) -> Result<TaskExecutionConstraints> {
348 let container = container(requirements, self.config.task.container.as_deref());
349
350 let mut cpu = cpu(requirements);
351 if (self.max_cpu as f64) < cpu {
352 let env_specific = if self.config.suppress_env_specific_output {
353 String::new()
354 } else {
355 format!(
356 ", but the execution backend has a maximum of {max_cpu}",
357 max_cpu = self.max_cpu,
358 )
359 };
360 match self.config.task.cpu_limit_behavior {
361 TaskResourceLimitBehavior::TryWithMax => {
362 warn!(
363 "task requires at least {cpu} CPU{s}{env_specific}",
364 s = if cpu == 1.0 { "" } else { "s" },
365 );
366 cpu = self.max_cpu as f64;
368 }
369 TaskResourceLimitBehavior::Deny => {
370 bail!(
371 "task requires at least {cpu} CPU{s}{env_specific}",
372 s = if cpu == 1.0 { "" } else { "s" },
373 );
374 }
375 }
376 }
377
378 let mut memory = memory(requirements)?;
379 if self.max_memory < memory as u64 {
380 let env_specific = if self.config.suppress_env_specific_output {
381 String::new()
382 } else {
383 format!(
384 ", but the execution backend has a maximum of {max_memory} GiB",
385 max_memory = self.max_memory as f64 / ONE_GIBIBYTE,
386 )
387 };
388 match self.config.task.memory_limit_behavior {
389 TaskResourceLimitBehavior::TryWithMax => {
390 warn!(
391 "task requires at least {memory} GiB of memory{env_specific}",
392 memory = memory as f64 / ONE_GIBIBYTE,
394 );
395 memory = self.max_memory.try_into().unwrap_or(i64::MAX);
397 }
398 TaskResourceLimitBehavior::Deny => {
399 bail!(
400 "task requires at least {memory} GiB of memory{env_specific}",
401 memory = memory as f64 / ONE_GIBIBYTE,
403 );
404 }
405 }
406 }
407
408 Ok(TaskExecutionConstraints {
409 container: Some(container.into_owned()),
410 cpu,
411 memory,
412 gpu: Default::default(),
413 fpga: Default::default(),
414 disks: Default::default(),
415 })
416 }
417
418 fn guest_work_dir(&self) -> Option<&Path> {
419 Some(Path::new(GUEST_WORK_DIR))
420 }
421
422 fn localize_inputs<'a, 'b, 'c, 'd>(
423 &'a self,
424 downloader: &'b HttpDownloader,
425 inputs: &'c mut [crate::eval::Input],
426 ) -> BoxFuture<'d, Result<()>>
427 where
428 'a: 'd,
429 'b: 'd,
430 'c: 'd,
431 Self: 'd,
432 {
433 async move {
434 let mut trie = InputTrie::default();
436 for input in inputs.iter() {
437 trie.insert(input)?;
438 }
439
440 for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
441 if let Some(input) = inputs.get_mut(index) {
442 input.set_guest_path(guest_path);
443 } else {
444 bail!("invalid index {} returned from trie", index);
445 }
446 }
447
448 let mut downloads = JoinSet::new();
450 for (idx, input) in inputs.iter_mut().enumerate() {
451 match input.path() {
452 EvaluationPath::Local(path) => {
453 input.set_location(Location::Path(path.clone().into()));
454 }
455 EvaluationPath::Remote(url) => {
456 let downloader = downloader.clone();
457 let url = url.clone();
458 downloads.spawn(async move {
459 let location_result = downloader.download(&url).await;
460
461 match location_result {
462 Ok(location) => Ok((idx, location.into_owned())),
463 Err(e) => bail!("failed to localize `{url}`: {e:?}"),
464 }
465 });
466 }
467 }
468 }
469
470 while let Some(result) = downloads.join_next().await {
471 match result {
472 Ok(Ok((idx, location))) => {
473 inputs
474 .get_mut(idx)
475 .expect("index from should be valid")
476 .set_location(location);
477 }
478 Ok(Err(e)) => {
479 bail!(e)
481 }
482 Err(e) => {
483 bail!("download task failed: {e:?}")
485 }
486 }
487 }
488
489 Ok(())
490 }
491 .boxed()
492 }
493
494 fn spawn(
495 &self,
496 request: TaskSpawnRequest,
497 token: CancellationToken,
498 ) -> Result<TaskExecutionEvents> {
499 let (spawned_tx, spawned_rx) = oneshot::channel();
500 let (completed_tx, completed_rx) = oneshot::channel();
501
502 let requirements = request.requirements();
503 let hints = request.hints();
504
505 let container = container(requirements, self.config.task.container.as_deref()).into_owned();
506 let mut cpu = cpu(requirements);
507 if let TaskResourceLimitBehavior::TryWithMax = self.config.task.cpu_limit_behavior {
508 cpu = std::cmp::min(cpu.ceil() as u64, self.max_cpu) as f64;
509 }
510 let mut memory = memory(requirements)? as u64;
511 if let TaskResourceLimitBehavior::TryWithMax = self.config.task.memory_limit_behavior {
512 memory = std::cmp::min(memory, self.max_memory);
513 }
514 let max_cpu = max_cpu(hints);
515 let max_memory = max_memory(hints)?.map(|i| i as u64);
516
517 let name = format!(
518 "{id}-{generated}",
519 id = request.id(),
520 generated = self
521 .generator
522 .lock()
523 .expect("generator should always acquire")
524 .next()
525 .expect("generator should never be exhausted")
526 );
527 self.manager.send(
528 DockerTaskRequest {
529 config: self.config.clone(),
530 inner: request,
531 backend: self.inner.clone(),
532 name,
533 container,
534 cpu,
535 memory,
536 max_cpu,
537 max_memory,
538 token,
539 },
540 spawned_tx,
541 completed_tx,
542 );
543
544 Ok(TaskExecutionEvents {
545 spawned: spawned_rx,
546 completed: completed_rx,
547 })
548 }
549
550 #[cfg(unix)]
551 fn cleanup<'a, 'b, 'c>(
552 &'a self,
553 output_dir: &'b Path,
554 token: CancellationToken,
555 ) -> Option<BoxFuture<'c, ()>>
556 where
557 'a: 'c,
558 'b: 'c,
559 Self: 'c,
560 {
561 const GUEST_OUT_DIR: &str = "/workflow_output";
563
564 const CLEANUP_CPU: f64 = 0.1;
566
567 const CLEANUP_MEMORY: f64 = 0.05;
569
570 let backend = self.inner.clone();
571 let generator = self.generator.clone();
572 let output_path = std::path::absolute(output_dir).expect("failed to get absolute path");
573 if !output_path.is_dir() {
574 info!("output directory does not exist: skipping cleanup");
575 return None;
576 }
577
578 Some(
579 async move {
580 let result = async {
581 let (uid, gid) = unsafe { (libc::getuid(), libc::getgid()) };
582 let ownership = format!("{uid}:{gid}");
583 let output_mount = Input::builder()
584 .path(GUEST_OUT_DIR)
585 .contents(Contents::Path(output_path.clone()))
586 .ty(InputType::Directory)
587 .read_only(false)
589 .build();
590
591 let name = format!(
592 "docker-backend-cleanup-{id}",
593 id = generator
594 .lock()
595 .expect("generator should always acquire")
596 .next()
597 .expect("generator should never be exhausted")
598 );
599
600 let task = Task::builder()
601 .name(&name)
602 .executions(NonEmpty::new(
603 Execution::builder()
604 .image("alpine:latest")
605 .program("chown")
606 .args([
607 "-R".to_string(),
608 ownership.clone(),
609 GUEST_OUT_DIR.to_string(),
610 ])
611 .work_dir("/")
612 .build(),
613 ))
614 .inputs([output_mount])
615 .resources(
616 Resources::builder()
617 .cpu(CLEANUP_CPU)
618 .ram(CLEANUP_MEMORY)
619 .build(),
620 )
621 .build();
622
623 info!(
624 "running cleanup task `{name}` to change ownership of `{path}` to \
625 `{ownership}`",
626 path = output_path.display(),
627 );
628
629 let (spawned_tx, _) = oneshot::channel();
630 let output_rx = backend
631 .run(task, Some(spawned_tx), token)
632 .map_err(|e| anyhow!("failed to submit cleanup task: {e}"))?;
633
634 let statuses = output_rx
635 .await
636 .map_err(|e| anyhow!("failed to run cleanup task: {e}"))?;
637 let status = statuses.first();
638 if status.success() {
639 Ok(())
640 } else {
641 bail!(
642 "failed to chown output directory `{path}`",
643 path = output_path.display()
644 );
645 }
646 }
647 .await;
648
649 if let Err(e) = result {
650 tracing::error!("cleanup task failed: {e:#}");
651 }
652 }
653 .boxed(),
654 )
655 }
656
657 #[cfg(not(unix))]
658 fn cleanup<'a, 'b, 'c>(&'a self, _: &'b Path, _: CancellationToken) -> Option<BoxFuture<'c, ()>>
659 where
660 'a: 'c,
661 'b: 'c,
662 Self: 'c,
663 {
664 tracing::debug!("cleanup task is not supported on this platform");
665 None
666 }
667}