1use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use crankshaft::config::backend;
14use crankshaft::engine::Task;
15use crankshaft::engine::service::name::GeneratorIterator;
16use crankshaft::engine::service::name::UniqueAlphanumeric;
17use crankshaft::engine::service::runner::Backend;
18use crankshaft::engine::service::runner::backend::docker;
19use crankshaft::engine::task::Execution;
20use crankshaft::engine::task::Input;
21use crankshaft::engine::task::Output;
22use crankshaft::engine::task::Resources;
23use crankshaft::engine::task::input::Contents;
24use crankshaft::engine::task::input::Type as InputType;
25use crankshaft::engine::task::output::Type as OutputType;
26use futures::FutureExt;
27use futures::future::BoxFuture;
28use nonempty::NonEmpty;
29use tokio::sync::oneshot;
30use tokio::task::JoinSet;
31use tokio_util::sync::CancellationToken;
32use tracing::info;
33use url::Url;
34
35use super::TaskExecutionBackend;
36use super::TaskExecutionConstraints;
37use super::TaskExecutionEvents;
38use super::TaskExecutionResult;
39use super::TaskManager;
40use super::TaskManagerRequest;
41use super::TaskSpawnRequest;
42use crate::COMMAND_FILE_NAME;
43use crate::InputTrie;
44use crate::ONE_GIBIBYTE;
45use crate::PrimitiveValue;
46use crate::STDERR_FILE_NAME;
47use crate::STDOUT_FILE_NAME;
48use crate::Value;
49use crate::WORK_DIR_NAME;
50use crate::config::Config;
51use crate::config::DEFAULT_TASK_SHELL;
52use crate::config::DockerBackendConfig;
53use crate::http::Downloader;
54use crate::http::HttpDownloader;
55use crate::http::Location;
56use crate::path::EvaluationPath;
57use crate::v1::container;
58use crate::v1::cpu;
59use crate::v1::max_cpu;
60use crate::v1::max_memory;
61use crate::v1::memory;
62
63const INITIAL_EXPECTED_NAMES: usize = 1000;
68
69const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs";
71
72const GUEST_WORK_DIR: &str = "/mnt/task/work";
74
75const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
77
78const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
80
81const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
83
84#[derive(Debug)]
87struct DockerTaskRequest {
88 config: Arc<Config>,
90 inner: TaskSpawnRequest,
92 backend: Arc<docker::Backend>,
94 name: String,
96 container: String,
98 cpu: f64,
100 memory: u64,
102 max_cpu: Option<f64>,
104 max_memory: Option<u64>,
106 token: CancellationToken,
108}
109
110impl TaskManagerRequest for DockerTaskRequest {
111 fn cpu(&self) -> f64 {
112 self.cpu
113 }
114
115 fn memory(&self) -> u64 {
116 self.memory
117 }
118
119 async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
120 let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
122 fs::create_dir_all(&work_dir).with_context(|| {
123 format!(
124 "failed to create directory `{path}`",
125 path = work_dir.display()
126 )
127 })?;
128
129 let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
132 fs::write(&command_path, self.inner.command()).with_context(|| {
133 format!(
134 "failed to write command contents to `{path}`",
135 path = command_path.display()
136 )
137 })?;
138
139 let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
142 for input in self.inner.inputs().iter() {
143 if let Some(guest_path) = input.guest_path() {
144 let location = input.location().expect("all inputs should have localized");
145
146 if location.exists() {
147 inputs.push(
148 Input::builder()
149 .path(guest_path)
150 .contents(Contents::Path(location.into()))
151 .ty(input.kind())
152 .read_only(true)
153 .build(),
154 );
155 }
156 }
157 }
158
159 inputs.push(
161 Input::builder()
162 .path(GUEST_WORK_DIR)
163 .contents(Contents::Path(work_dir.to_path_buf()))
164 .ty(InputType::Directory)
165 .read_only(false)
166 .build(),
167 );
168
169 inputs.push(
171 Input::builder()
172 .path(GUEST_COMMAND_PATH)
173 .contents(Contents::Path(command_path.to_path_buf()))
174 .ty(InputType::File)
175 .read_only(true)
176 .build(),
177 );
178
179 let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
180 let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
181
182 let outputs = vec![
183 Output::builder()
184 .path(GUEST_STDOUT_PATH)
185 .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
186 .ty(OutputType::File)
187 .build(),
188 Output::builder()
189 .path(GUEST_STDERR_PATH)
190 .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
191 .ty(OutputType::File)
192 .build(),
193 ];
194
195 let task = Task::builder()
196 .name(self.name)
197 .executions(NonEmpty::new(
198 Execution::builder()
199 .image(&self.container)
200 .program(
201 self.config
202 .task
203 .shell
204 .as_deref()
205 .unwrap_or(DEFAULT_TASK_SHELL),
206 )
207 .args(["-C".to_string(), GUEST_COMMAND_PATH.to_string()])
208 .work_dir(GUEST_WORK_DIR)
209 .env({
210 let mut final_env = indexmap::IndexMap::new();
211 for (k, v) in self.inner.env() {
212 let guest_path = self
213 .inner
214 .inputs()
215 .iter()
216 .find(|input| input.path().to_str() == Some(v))
217 .and_then(|input| input.guest_path());
218
219 final_env.insert(k.clone(), guest_path.unwrap_or(v).to_string());
220 }
221 final_env
222 })
223 .stdout(GUEST_STDOUT_PATH)
224 .stderr(GUEST_STDERR_PATH)
225 .build(),
226 ))
227 .inputs(inputs)
228 .outputs(outputs)
229 .resources(
230 Resources::builder()
231 .cpu(self.cpu)
232 .maybe_cpu_limit(self.max_cpu)
233 .ram(self.memory as f64 / ONE_GIBIBYTE)
234 .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
235 .build(),
236 )
237 .build();
238
239 let statuses = self
240 .backend
241 .run(task, Some(spawned), self.token.clone())
242 .map_err(|e| anyhow!("{e:#}"))?
243 .await
244 .map_err(|e| anyhow!("{e:#}"))?;
245
246 assert_eq!(statuses.len(), 1, "there should only be one exit status");
247 let status = statuses.first();
248
249 Ok(TaskExecutionResult {
250 inputs: self.inner.info.inputs,
251 exit_code: status.code().expect("should have exit code"),
252 work_dir: EvaluationPath::Local(work_dir),
253 stdout: PrimitiveValue::new_file(
254 stdout_path
255 .into_os_string()
256 .into_string()
257 .expect("path should be UTF-8"),
258 )
259 .into(),
260 stderr: PrimitiveValue::new_file(
261 stderr_path
262 .into_os_string()
263 .into_string()
264 .expect("path should be UTF-8"),
265 )
266 .into(),
267 })
268 }
269}
270
271pub struct DockerBackend {
273 config: Arc<Config>,
275 inner: Arc<docker::Backend>,
277 max_concurrency: u64,
279 max_cpu: u64,
281 max_memory: u64,
283 manager: TaskManager<DockerTaskRequest>,
285 generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
287}
288
289impl DockerBackend {
290 pub async fn new(config: Arc<Config>, backend_config: &DockerBackendConfig) -> Result<Self> {
295 info!("initializing Docker backend");
296
297 let backend = docker::Backend::initialize_default_with(
298 backend::docker::Config::builder()
299 .cleanup(backend_config.cleanup)
300 .build(),
301 )
302 .await
303 .map_err(|e| anyhow!("{e:#}"))
304 .context("failed to initialize Docker backend")?;
305
306 let resources = *backend.resources();
307 let cpu = resources.cpu();
308 let max_cpu = resources.max_cpu();
309 let memory = resources.memory();
310 let max_memory = resources.max_memory();
311
312 let manager = if resources.use_service() {
316 TaskManager::new_unlimited(max_cpu, max_memory)
317 } else {
318 TaskManager::new(cpu, max_cpu, memory, max_memory)
319 };
320
321 Ok(Self {
322 config,
323 inner: Arc::new(backend),
324 max_concurrency: cpu,
325 max_cpu,
326 max_memory,
327 manager,
328 generator: Arc::new(Mutex::new(GeneratorIterator::new(
329 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
330 INITIAL_EXPECTED_NAMES,
331 ))),
332 })
333 }
334}
335
336impl TaskExecutionBackend for DockerBackend {
337 fn max_concurrency(&self) -> u64 {
338 self.max_concurrency
339 }
340
341 fn constraints(
342 &self,
343 requirements: &HashMap<String, Value>,
344 _: &HashMap<String, Value>,
345 ) -> Result<TaskExecutionConstraints> {
346 let container = container(requirements, self.config.task.container.as_deref());
347
348 let cpu = cpu(requirements);
349 if (self.max_cpu as f64) < cpu {
350 bail!(
351 "task requires at least {cpu} CPU{s}, but the execution backend has a maximum of \
352 {max_cpu}",
353 s = if cpu == 1.0 { "" } else { "s" },
354 max_cpu = self.max_cpu,
355 );
356 }
357
358 let memory = memory(requirements)?;
359 if self.max_memory < memory as u64 {
360 let memory = memory as f64 / ONE_GIBIBYTE;
362 let max_memory = self.max_memory as f64 / ONE_GIBIBYTE;
363
364 bail!(
365 "task requires at least {memory} GiB of memory, but the execution backend has a \
366 maximum of {max_memory} GiB",
367 );
368 }
369
370 Ok(TaskExecutionConstraints {
371 container: Some(container.into_owned()),
372 cpu,
373 memory,
374 gpu: Default::default(),
375 fpga: Default::default(),
376 disks: Default::default(),
377 })
378 }
379
380 fn guest_work_dir(&self) -> Option<&Path> {
381 Some(Path::new(GUEST_WORK_DIR))
382 }
383
384 fn localize_inputs<'a, 'b, 'c, 'd>(
385 &'a self,
386 downloader: &'b HttpDownloader,
387 inputs: &'c mut [crate::eval::Input],
388 ) -> BoxFuture<'d, Result<()>>
389 where
390 'a: 'd,
391 'b: 'd,
392 'c: 'd,
393 Self: 'd,
394 {
395 async move {
396 let mut trie = InputTrie::default();
398 for input in inputs.iter() {
399 trie.insert(input)?;
400 }
401
402 for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
403 if let Some(input) = inputs.get_mut(index) {
404 input.set_guest_path(guest_path);
405 } else {
406 bail!("invalid index {} returned from trie", index);
407 }
408 }
409
410 let mut downloads = JoinSet::new();
412 for (idx, input) in inputs.iter_mut().enumerate() {
413 match input.path() {
414 EvaluationPath::Local(path) => {
415 input.set_location(Location::Path(path.clone().into()));
416 }
417 EvaluationPath::Remote(url) => {
418 let downloader = downloader.clone();
419 let url = url.clone();
420 downloads.spawn(async move {
421 let location_result = downloader.download(&url).await;
422
423 match location_result {
424 Ok(location) => Ok((idx, location.into_owned())),
425 Err(e) => bail!("failed to localize `{url}`: {e:?}"),
426 }
427 });
428 }
429 }
430 }
431
432 while let Some(result) = downloads.join_next().await {
433 match result {
434 Ok(Ok((idx, location))) => {
435 inputs
436 .get_mut(idx)
437 .expect("index from should be valid")
438 .set_location(location);
439 }
440 Ok(Err(e)) => {
441 bail!(e)
443 }
444 Err(e) => {
445 bail!("download task failed: {e:?}")
447 }
448 }
449 }
450
451 Ok(())
452 }
453 .boxed()
454 }
455
456 fn spawn(
457 &self,
458 request: TaskSpawnRequest,
459 token: CancellationToken,
460 ) -> Result<TaskExecutionEvents> {
461 let (spawned_tx, spawned_rx) = oneshot::channel();
462 let (completed_tx, completed_rx) = oneshot::channel();
463
464 let requirements = request.requirements();
465 let hints = request.hints();
466
467 let container = container(requirements, self.config.task.container.as_deref()).into_owned();
468 let cpu = cpu(requirements);
469 let memory = memory(requirements)? as u64;
470 let max_cpu = max_cpu(hints);
471 let max_memory = max_memory(hints)?.map(|i| i as u64);
472
473 let name = format!(
474 "{id}-{generated}",
475 id = request.id(),
476 generated = self
477 .generator
478 .lock()
479 .expect("generator should always acquire")
480 .next()
481 .expect("generator should never be exhausted")
482 );
483 self.manager.send(
484 DockerTaskRequest {
485 config: self.config.clone(),
486 inner: request,
487 backend: self.inner.clone(),
488 name,
489 container,
490 cpu,
491 memory,
492 max_cpu,
493 max_memory,
494 token,
495 },
496 spawned_tx,
497 completed_tx,
498 );
499
500 Ok(TaskExecutionEvents {
501 spawned: spawned_rx,
502 completed: completed_rx,
503 })
504 }
505
506 #[cfg(unix)]
507 fn cleanup<'a, 'b, 'c>(
508 &'a self,
509 output_dir: &'b Path,
510 token: CancellationToken,
511 ) -> Option<BoxFuture<'c, ()>>
512 where
513 'a: 'c,
514 'b: 'c,
515 Self: 'c,
516 {
517 const GUEST_OUT_DIR: &str = "/workflow_output";
519
520 const CLEANUP_CPU: f64 = 0.1;
522
523 const CLEANUP_MEMORY: f64 = 0.05;
525
526 let backend = self.inner.clone();
527 let generator = self.generator.clone();
528 let output_path = std::path::absolute(output_dir).expect("failed to get absolute path");
529 if !output_path.is_dir() {
530 info!("output directory does not exist: skipping cleanup");
531 return None;
532 }
533
534 Some(
535 async move {
536 let result = async {
537 let (uid, gid) = unsafe { (libc::getuid(), libc::getgid()) };
538 let ownership = format!("{uid}:{gid}");
539 let output_mount = Input::builder()
540 .path(GUEST_OUT_DIR)
541 .contents(Contents::Path(output_path.clone()))
542 .ty(InputType::Directory)
543 .read_only(false)
545 .build();
546
547 let name = format!(
548 "docker-backend-cleanup-{id}",
549 id = generator
550 .lock()
551 .expect("generator should always acquire")
552 .next()
553 .expect("generator should never be exhausted")
554 );
555
556 let task = Task::builder()
557 .name(&name)
558 .executions(NonEmpty::new(
559 Execution::builder()
560 .image("alpine:latest")
561 .program("chown")
562 .args([
563 "-R".to_string(),
564 ownership.clone(),
565 GUEST_OUT_DIR.to_string(),
566 ])
567 .work_dir("/")
568 .build(),
569 ))
570 .inputs([output_mount])
571 .resources(
572 Resources::builder()
573 .cpu(CLEANUP_CPU)
574 .ram(CLEANUP_MEMORY)
575 .build(),
576 )
577 .build();
578
579 info!(
580 "running cleanup task `{name}` to change ownership of `{path}` to \
581 `{ownership}`",
582 path = output_path.display(),
583 );
584
585 let (spawned_tx, _) = oneshot::channel();
586 let output_rx = backend
587 .run(task, Some(spawned_tx), token)
588 .map_err(|e| anyhow!("failed to submit cleanup task: {e}"))?;
589
590 let statuses = output_rx
591 .await
592 .map_err(|e| anyhow!("failed to run cleanup task: {e}"))?;
593 let status = statuses.first();
594 if status.success() {
595 Ok(())
596 } else {
597 bail!(
598 "failed to chown output directory `{path}`",
599 path = output_path.display()
600 );
601 }
602 }
603 .await;
604
605 if let Err(e) = result {
606 tracing::error!("cleanup task failed: {e:#}");
607 }
608 }
609 .boxed(),
610 )
611 }
612
613 #[cfg(not(unix))]
614 fn cleanup<'a, 'b, 'c>(&'a self, _: &'b Path, _: CancellationToken) -> Option<BoxFuture<'c, ()>>
615 where
616 'a: 'c,
617 'b: 'c,
618 Self: 'c,
619 {
620 tracing::debug!("cleanup task is not supported on this platform");
621 None
622 }
623}