1use std::collections::HashMap;
4use std::fs;
5use std::path::Path;
6use std::sync::Arc;
7use std::sync::Mutex;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::anyhow;
12use anyhow::bail;
13use crankshaft::config::backend;
14use crankshaft::engine::Task;
15use crankshaft::engine::service::name::GeneratorIterator;
16use crankshaft::engine::service::name::UniqueAlphanumeric;
17use crankshaft::engine::service::runner::Backend;
18use crankshaft::engine::service::runner::backend::docker;
19use crankshaft::engine::task::Execution;
20use crankshaft::engine::task::Input;
21use crankshaft::engine::task::Output;
22use crankshaft::engine::task::Resources;
23use crankshaft::engine::task::input::Contents;
24use crankshaft::engine::task::input::Type as InputType;
25use crankshaft::engine::task::output::Type as OutputType;
26use futures::FutureExt;
27use futures::future::BoxFuture;
28use nonempty::NonEmpty;
29use tokio::sync::oneshot;
30use tokio::task::JoinSet;
31use tokio_util::sync::CancellationToken;
32use tracing::info;
33use url::Url;
34
35use super::TaskExecutionBackend;
36use super::TaskExecutionConstraints;
37use super::TaskExecutionEvents;
38use super::TaskExecutionResult;
39use super::TaskManager;
40use super::TaskManagerRequest;
41use super::TaskSpawnRequest;
42use crate::COMMAND_FILE_NAME;
43use crate::InputTrie;
44use crate::ONE_GIBIBYTE;
45use crate::PrimitiveValue;
46use crate::STDERR_FILE_NAME;
47use crate::STDOUT_FILE_NAME;
48use crate::Value;
49use crate::WORK_DIR_NAME;
50use crate::config::DEFAULT_TASK_SHELL;
51use crate::config::DockerBackendConfig;
52use crate::config::TaskConfig;
53use crate::http::Downloader;
54use crate::http::HttpDownloader;
55use crate::http::Location;
56use crate::path::EvaluationPath;
57use crate::v1::container;
58use crate::v1::cpu;
59use crate::v1::max_cpu;
60use crate::v1::max_memory;
61use crate::v1::memory;
62
63const INITIAL_EXPECTED_NAMES: usize = 1000;
68
69const GUEST_INPUTS_DIR: &str = "/mnt/inputs";
71
72const GUEST_WORK_DIR: &str = "/mnt/work";
74
75const GUEST_COMMAND_PATH: &str = "/mnt/command";
77
78const GUEST_STDOUT_PATH: &str = "/stdout";
80
81const GUEST_STDERR_PATH: &str = "/stderr";
83
84#[derive(Debug)]
87struct DockerTaskRequest {
88 inner: TaskSpawnRequest,
90 backend: Arc<docker::Backend>,
92 name: String,
94 shell: Arc<Option<String>>,
96 container: String,
98 cpu: f64,
100 memory: u64,
102 max_cpu: Option<f64>,
104 max_memory: Option<u64>,
106 token: CancellationToken,
108}
109
110impl TaskManagerRequest for DockerTaskRequest {
111 fn cpu(&self) -> f64 {
112 self.cpu
113 }
114
115 fn memory(&self) -> u64 {
116 self.memory
117 }
118
119 async fn run(self, spawned: oneshot::Sender<()>) -> Result<TaskExecutionResult> {
120 let work_dir = self.inner.attempt_dir().join(WORK_DIR_NAME);
122 fs::create_dir_all(&work_dir).with_context(|| {
123 format!(
124 "failed to create directory `{path}`",
125 path = work_dir.display()
126 )
127 })?;
128
129 let command_path = self.inner.attempt_dir().join(COMMAND_FILE_NAME);
132 fs::write(&command_path, self.inner.command()).with_context(|| {
133 format!(
134 "failed to write command contents to `{path}`",
135 path = command_path.display()
136 )
137 })?;
138
139 let mut inputs = Vec::with_capacity(self.inner.inputs().len() + 2);
142 for input in self.inner.inputs().iter() {
143 if let Some(guest_path) = input.guest_path() {
144 let location = input.location().expect("all inputs should have localized");
145
146 if location.exists() {
147 inputs.push(
148 Input::builder()
149 .path(guest_path)
150 .contents(Contents::Path(location.into()))
151 .ty(input.kind())
152 .read_only(true)
153 .build(),
154 );
155 }
156 }
157 }
158
159 inputs.push(
161 Input::builder()
162 .path(GUEST_WORK_DIR)
163 .contents(Contents::Path(work_dir.to_path_buf()))
164 .ty(InputType::Directory)
165 .read_only(false)
166 .build(),
167 );
168
169 inputs.push(
171 Input::builder()
172 .path(GUEST_COMMAND_PATH)
173 .contents(Contents::Path(command_path.to_path_buf()))
174 .ty(InputType::File)
175 .read_only(true)
176 .build(),
177 );
178
179 let stdout_path = self.inner.attempt_dir().join(STDOUT_FILE_NAME);
180 let stderr_path = self.inner.attempt_dir().join(STDERR_FILE_NAME);
181
182 let outputs = vec![
183 Output::builder()
184 .path(GUEST_STDOUT_PATH)
185 .url(Url::from_file_path(&stdout_path).expect("path should be absolute"))
186 .ty(OutputType::File)
187 .build(),
188 Output::builder()
189 .path(GUEST_STDERR_PATH)
190 .url(Url::from_file_path(&stderr_path).expect("path should be absolute"))
191 .ty(OutputType::File)
192 .build(),
193 ];
194
195 let task = Task::builder()
196 .name(self.name)
197 .executions(NonEmpty::new(
198 Execution::builder()
199 .image(&self.container)
200 .program(self.shell.as_deref().unwrap_or(DEFAULT_TASK_SHELL))
201 .args(["-C".to_string(), GUEST_COMMAND_PATH.to_string()])
202 .work_dir(GUEST_WORK_DIR)
203 .env({
204 let mut final_env = indexmap::IndexMap::new();
205 for (k, v) in self.inner.env() {
206 let guest_path = self
207 .inner
208 .inputs()
209 .iter()
210 .find(|input| input.path().to_str() == Some(v))
211 .and_then(|input| input.guest_path());
212
213 final_env.insert(k.clone(), guest_path.unwrap_or(v).to_string());
214 }
215 final_env
216 })
217 .stdout(GUEST_STDOUT_PATH)
218 .stderr(GUEST_STDERR_PATH)
219 .build(),
220 ))
221 .inputs(inputs)
222 .outputs(outputs)
223 .resources(
224 Resources::builder()
225 .cpu(self.cpu)
226 .maybe_cpu_limit(self.max_cpu)
227 .ram(self.memory as f64 / ONE_GIBIBYTE)
228 .maybe_ram_limit(self.max_memory.map(|m| m as f64 / ONE_GIBIBYTE))
229 .build(),
230 )
231 .build();
232
233 let statuses = self
234 .backend
235 .run(task, Some(spawned), self.token.clone())
236 .map_err(|e| anyhow!("{e:#}"))?
237 .await
238 .map_err(|e| anyhow!("{e:#}"))?;
239
240 assert_eq!(statuses.len(), 1, "there should only be one exit status");
241 let status = statuses.first();
242
243 Ok(TaskExecutionResult {
244 inputs: self.inner.info.inputs,
245 exit_code: status.code().expect("should have exit code"),
246 work_dir: EvaluationPath::Local(work_dir),
247 stdout: PrimitiveValue::new_file(
248 stdout_path
249 .into_os_string()
250 .into_string()
251 .expect("path should be UTF-8"),
252 )
253 .into(),
254 stderr: PrimitiveValue::new_file(
255 stderr_path
256 .into_os_string()
257 .into_string()
258 .expect("path should be UTF-8"),
259 )
260 .into(),
261 })
262 }
263}
264
265pub struct DockerBackend {
267 inner: Arc<docker::Backend>,
269 shell: Arc<Option<String>>,
271 container: Option<String>,
273 max_concurrency: u64,
275 max_cpu: u64,
277 max_memory: u64,
279 manager: TaskManager<DockerTaskRequest>,
281 generator: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
283}
284
285impl DockerBackend {
286 pub async fn new(task: &TaskConfig, config: &DockerBackendConfig) -> Result<Self> {
289 task.validate()?;
290 config.validate()?;
291
292 info!("initializing Docker backend");
293
294 let backend = docker::Backend::initialize_default_with(
295 backend::docker::Config::builder()
296 .cleanup(config.cleanup)
297 .build(),
298 )
299 .await
300 .map_err(|e| anyhow!("{e:#}"))
301 .context("failed to initialize Docker backend")?;
302
303 let resources = *backend.resources();
304 let cpu = resources.cpu();
305 let max_cpu = resources.max_cpu();
306 let memory = resources.memory();
307 let max_memory = resources.max_memory();
308
309 let manager = if resources.use_service() {
313 TaskManager::new_unlimited(max_cpu, max_memory)
314 } else {
315 TaskManager::new(cpu, max_cpu, memory, max_memory)
316 };
317
318 Ok(Self {
319 inner: Arc::new(backend),
320 shell: Arc::new(task.shell.clone()),
321 container: task.shell.clone(),
322 max_concurrency: cpu,
323 max_cpu,
324 max_memory,
325 manager,
326 generator: Arc::new(Mutex::new(GeneratorIterator::new(
327 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
328 INITIAL_EXPECTED_NAMES,
329 ))),
330 })
331 }
332}
333
334impl TaskExecutionBackend for DockerBackend {
335 fn max_concurrency(&self) -> u64 {
336 self.max_concurrency
337 }
338
339 fn constraints(
340 &self,
341 requirements: &HashMap<String, Value>,
342 _: &HashMap<String, Value>,
343 ) -> Result<TaskExecutionConstraints> {
344 let container = container(requirements, self.container.as_deref());
345
346 let cpu = cpu(requirements);
347 if (self.max_cpu as f64) < cpu {
348 bail!(
349 "task requires at least {cpu} CPU{s}, but the execution backend has a maximum of \
350 {max_cpu}",
351 s = if cpu == 1.0 { "" } else { "s" },
352 max_cpu = self.max_cpu,
353 );
354 }
355
356 let memory = memory(requirements)?;
357 if self.max_memory < memory as u64 {
358 let memory = memory as f64 / ONE_GIBIBYTE;
360 let max_memory = self.max_memory as f64 / ONE_GIBIBYTE;
361
362 bail!(
363 "task requires at least {memory} GiB of memory, but the execution backend has a \
364 maximum of {max_memory} GiB",
365 );
366 }
367
368 Ok(TaskExecutionConstraints {
369 container: Some(container.into_owned()),
370 cpu,
371 memory,
372 gpu: Default::default(),
373 fpga: Default::default(),
374 disks: Default::default(),
375 })
376 }
377
378 fn guest_work_dir(&self) -> Option<&Path> {
379 Some(Path::new(GUEST_WORK_DIR))
380 }
381
382 fn localize_inputs<'a, 'b, 'c, 'd>(
383 &'a self,
384 downloader: &'b HttpDownloader,
385 inputs: &'c mut [crate::eval::Input],
386 ) -> BoxFuture<'d, Result<()>>
387 where
388 'a: 'd,
389 'b: 'd,
390 'c: 'd,
391 Self: 'd,
392 {
393 async move {
394 let mut trie = InputTrie::default();
396 for input in inputs.iter() {
397 trie.insert(input)?;
398 }
399
400 for (index, guest_path) in trie.calculate_guest_paths(GUEST_INPUTS_DIR)? {
401 if let Some(input) = inputs.get_mut(index) {
402 input.set_guest_path(guest_path);
403 } else {
404 bail!("invalid index {} returned from trie", index);
405 }
406 }
407
408 let mut downloads = JoinSet::new();
410 for (idx, input) in inputs.iter_mut().enumerate() {
411 match input.path() {
412 EvaluationPath::Local(path) => {
413 input.set_location(Location::Path(path.clone().into()));
414 }
415 EvaluationPath::Remote(url) => {
416 let downloader = downloader.clone();
417 let url = url.clone();
418 downloads.spawn(async move {
419 let location_result = downloader.download(&url).await;
420
421 match location_result {
422 Ok(location) => Ok((idx, location.into_owned())),
423 Err(e) => bail!("failed to localize `{url}`: {e:?}"),
424 }
425 });
426 }
427 }
428 }
429
430 while let Some(result) = downloads.join_next().await {
431 match result {
432 Ok(Ok((idx, location))) => {
433 inputs
434 .get_mut(idx)
435 .expect("index from should be valid")
436 .set_location(location);
437 }
438 Ok(Err(e)) => {
439 bail!(e)
441 }
442 Err(e) => {
443 bail!("download task failed: {e:?}")
445 }
446 }
447 }
448
449 Ok(())
450 }
451 .boxed()
452 }
453
454 fn spawn(
455 &self,
456 request: TaskSpawnRequest,
457 token: CancellationToken,
458 ) -> Result<TaskExecutionEvents> {
459 let (spawned_tx, spawned_rx) = oneshot::channel();
460 let (completed_tx, completed_rx) = oneshot::channel();
461
462 let requirements = request.requirements();
463 let hints = request.hints();
464
465 let container = container(requirements, self.container.as_deref()).into_owned();
466 let cpu = cpu(requirements);
467 let memory = memory(requirements)? as u64;
468 let max_cpu = max_cpu(hints);
469 let max_memory = max_memory(hints)?.map(|i| i as u64);
470
471 let name = format!(
472 "{id}-{generated}",
473 id = request.id(),
474 generated = self
475 .generator
476 .lock()
477 .expect("generator should always acquire")
478 .next()
479 .expect("generator should never be exhausted")
480 );
481 self.manager.send(
482 DockerTaskRequest {
483 inner: request,
484 shell: self.shell.clone(),
485 backend: self.inner.clone(),
486 name,
487 container,
488 cpu,
489 memory,
490 max_cpu,
491 max_memory,
492 token,
493 },
494 spawned_tx,
495 completed_tx,
496 );
497
498 Ok(TaskExecutionEvents {
499 spawned: spawned_rx,
500 completed: completed_rx,
501 })
502 }
503
504 #[cfg(unix)]
505 fn cleanup<'a, 'b, 'c>(
506 &'a self,
507 output_dir: &'b Path,
508 token: CancellationToken,
509 ) -> Option<BoxFuture<'c, ()>>
510 where
511 'a: 'c,
512 'b: 'c,
513 Self: 'c,
514 {
515 const GUEST_OUT_DIR: &str = "/workflow_output";
517
518 const CLEANUP_CPU: f64 = 0.1;
520
521 const CLEANUP_MEMORY: f64 = 0.05;
523
524 let backend = self.inner.clone();
525 let generator = self.generator.clone();
526 let output_path = std::path::absolute(output_dir).expect("failed to get absolute path");
527 if !output_path.is_dir() {
528 info!("output directory does not exist: skipping cleanup");
529 return None;
530 }
531
532 Some(
533 async move {
534 let result = async {
535 let (uid, gid) = unsafe { (libc::getuid(), libc::getgid()) };
536 let ownership = format!("{uid}:{gid}");
537 let output_mount = Input::builder()
538 .path(GUEST_OUT_DIR)
539 .contents(Contents::Path(output_path.clone()))
540 .ty(InputType::Directory)
541 .read_only(false)
543 .build();
544
545 let name = format!(
546 "docker-backend-cleanup-{id}",
547 id = generator
548 .lock()
549 .expect("generator should always acquire")
550 .next()
551 .expect("generator should never be exhausted")
552 );
553
554 let task = Task::builder()
555 .name(&name)
556 .executions(NonEmpty::new(
557 Execution::builder()
558 .image("alpine:latest")
559 .program("chown")
560 .args([
561 "-R".to_string(),
562 ownership.clone(),
563 GUEST_OUT_DIR.to_string(),
564 ])
565 .work_dir("/")
566 .build(),
567 ))
568 .inputs([output_mount])
569 .resources(
570 Resources::builder()
571 .cpu(CLEANUP_CPU)
572 .ram(CLEANUP_MEMORY)
573 .build(),
574 )
575 .build();
576
577 info!(
578 "running cleanup task `{name}` to change ownership of `{path}` to \
579 `{ownership}`",
580 path = output_path.display(),
581 );
582
583 let (spawned_tx, _) = oneshot::channel();
584 let output_rx = backend
585 .run(task, Some(spawned_tx), token)
586 .map_err(|e| anyhow!("failed to submit cleanup task: {e}"))?;
587
588 let statuses = output_rx
589 .await
590 .map_err(|e| anyhow!("failed to run cleanup task: {e}"))?;
591 let status = statuses.first();
592 if status.success() {
593 Ok(())
594 } else {
595 bail!(
596 "failed to chown output directory `{path}`",
597 path = output_path.display()
598 );
599 }
600 }
601 .await;
602
603 if let Err(e) = result {
604 tracing::error!("cleanup task failed: {e:#}");
605 }
606 }
607 .boxed(),
608 )
609 }
610
611 #[cfg(not(unix))]
612 fn cleanup<'a, 'b, 'c>(&'a self, _: &'b Path, _: CancellationToken) -> Option<BoxFuture<'c, ()>>
613 where
614 'a: 'c,
615 'b: 'c,
616 Self: 'c,
617 {
618 tracing::debug!("cleanup task is not supported on this platform");
619 None
620 }
621}