1use std::collections::HashMap;
4use std::fs;
5use std::sync::Arc;
6use std::sync::Mutex;
7
8use anyhow::Context;
9use anyhow::Result;
10use anyhow::bail;
11use cloud_copy::UrlExt;
12use crankshaft::config::backend;
13use crankshaft::config::backend::tes::http::HttpAuthConfig;
14use crankshaft::engine::Task;
15use crankshaft::engine::service::name::GeneratorIterator;
16use crankshaft::engine::service::name::UniqueAlphanumeric;
17use crankshaft::engine::service::runner::Backend;
18use crankshaft::engine::service::runner::backend::TaskRunError;
19use crankshaft::engine::service::runner::backend::tes;
20use crankshaft::engine::task::Execution;
21use crankshaft::engine::task::Input;
22use crankshaft::engine::task::Output;
23use crankshaft::engine::task::Resources;
24use crankshaft::engine::task::input::Contents;
25use crankshaft::engine::task::input::Type as InputType;
26use crankshaft::engine::task::output::Type as OutputType;
27use futures::FutureExt;
28use futures::future::BoxFuture;
29use nonempty::NonEmpty;
30use secrecy::ExposeSecret;
31use tokio::task::JoinSet;
32use tracing::debug;
33use tracing::info;
34
35use super::ExecuteTaskRequest;
36use super::TaskExecutionBackend;
37use super::TaskExecutionConstraints;
38use super::TaskExecutionResult;
39use crate::CancellationContext;
40use crate::EvaluationPath;
41use crate::EvaluationPathKind;
42use crate::Events;
43use crate::ONE_GIBIBYTE;
44use crate::PrimitiveValue;
45use crate::TaskInputs;
46use crate::Value;
47use crate::backend::INITIAL_EXPECTED_NAMES;
48use crate::backend::STDERR_FILE_NAME;
49use crate::backend::STDOUT_FILE_NAME;
50use crate::backend::WORK_DIR_NAME;
51use crate::config::Config;
52use crate::config::ContentDigestMode;
53use crate::config::DEFAULT_TASK_SHELL;
54use crate::config::TesBackendAuthConfig;
55use crate::digest::UrlDigestExt;
56use crate::digest::calculate_local_digest;
57use crate::http::Transferer;
58use crate::v1::DEFAULT_DISK_MOUNT_POINT;
59use crate::v1::DEFAULT_TASK_REQUIREMENT_DISKS;
60use crate::v1::hints;
61use crate::v1::requirements;
62use crate::v1::requirements::ContainerSource;
63
64const GUEST_WORK_DIR: &str = "/mnt/task/work";
66
67const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
69
70const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
72
73const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
75
76const DEFAULT_TES_INTERVAL: u64 = 1;
78
79pub struct TesBackend {
81 config: Arc<Config>,
83 inner: tes::Backend,
85 names: Arc<Mutex<GeneratorIterator<UniqueAlphanumeric>>>,
87 cancellation: CancellationContext,
89}
90
91impl TesBackend {
92 pub async fn new(
97 config: Arc<Config>,
98 events: Events,
99 cancellation: CancellationContext,
100 ) -> Result<Self> {
101 info!("initializing TES backend");
102
103 let backend_config = config.backend()?;
104 let backend_config = backend_config
105 .as_tes()
106 .context("configured backend is not TES")?;
107
108 let mut http = backend::tes::http::Config::default();
109 match &backend_config.auth {
110 Some(TesBackendAuthConfig::Basic(config)) => {
111 http.auth = Some(HttpAuthConfig::Basic {
112 username: config.username.clone(),
113 password: config.password.inner().expose_secret().to_string(),
114 });
115 }
116 Some(TesBackendAuthConfig::Bearer(config)) => {
117 http.auth = Some(HttpAuthConfig::Bearer {
118 token: config.token.inner().expose_secret().to_string(),
119 });
120 }
121 None => {}
122 }
123
124 http.retries = backend_config.retries;
125 http.max_concurrency = backend_config.max_concurrency.map(|c| c as usize);
126
127 let names = Arc::new(Mutex::new(GeneratorIterator::new(
128 UniqueAlphanumeric::default_with_expected_generations(INITIAL_EXPECTED_NAMES),
129 INITIAL_EXPECTED_NAMES,
130 )));
131
132 let inner = tes::Backend::initialize(
133 backend::tes::Config::builder()
134 .url(backend_config.url.clone().expect("should have URL"))
135 .http(http)
136 .interval(backend_config.interval.unwrap_or(DEFAULT_TES_INTERVAL))
137 .build(),
138 names.clone(),
139 events.crankshaft().clone(),
140 )
141 .await;
142
143 Ok(Self {
144 config,
145 inner,
146 names,
147 cancellation,
148 })
149 }
150}
151
152impl TaskExecutionBackend for TesBackend {
153 fn constraints(
154 &self,
155 inputs: &TaskInputs,
156 requirements: &HashMap<String, Value>,
157 hints: &HashMap<String, Value>,
158 ) -> Result<TaskExecutionConstraints> {
159 let container =
160 requirements::container(inputs, requirements, self.config.task.container.as_deref());
161 match &container {
162 ContainerSource::Docker(_) | ContainerSource::Library(_) | ContainerSource::Oras(_) => {
163 }
164 ContainerSource::SifFile(_) => {
165 bail!(
166 "TES backend does not support local SIF file `{container:#}`; use a \
167 registry-based container image instead"
168 )
169 }
170 ContainerSource::Unknown(_) => {
171 bail!("TES backend does not support unknown container source `{container:#}`")
172 }
173 };
174
175 let disks = requirements::disks(inputs, requirements, hints)?;
176 if disks.values().any(|d| d.ty.is_some()) {
177 debug!("disk type hints are not supported by the TES backend and will be ignored");
178 }
179
180 Ok(TaskExecutionConstraints {
181 container: Some(container),
182 cpu: requirements::cpu(inputs, requirements),
183 memory: requirements::memory(inputs, requirements)? as u64,
184 gpu: Default::default(),
185 fpga: Default::default(),
186 disks: disks
187 .into_iter()
188 .map(|(mp, disk)| (mp.to_string(), disk.size))
189 .collect(),
190 })
191 }
192
193 fn needs_local_inputs(&self) -> bool {
194 false
195 }
196
197 fn execute<'a>(
198 &'a self,
199 transferer: &'a Arc<dyn Transferer>,
200 request: ExecuteTaskRequest<'a>,
201 ) -> BoxFuture<'a, Result<Option<TaskExecutionResult>>> {
202 async move {
203 let backend_config = self.config.backend()?;
204 let backend_config = backend_config
205 .as_tes()
206 .expect("configured backend should be TES");
207
208 let preemptible = hints::preemptible(request.inputs, request.hints)?;
209 let max_memory =
210 hints::max_memory(request.inputs, request.hints)?.map(|m| m as f64 / ONE_GIBIBYTE);
211 let name = format!(
212 "{id}-{generated}",
213 id = request.id,
214 generated = self
215 .names
216 .lock()
217 .expect("generator should always acquire")
218 .next()
219 .expect("generator should never be exhausted")
220 );
221
222 let command_path = request.command_path();
225 if let Some(parent) = command_path.parent() {
226 fs::create_dir_all(parent).with_context(|| {
227 format!(
228 "failed to create directory `{path}`",
229 path = parent.display()
230 )
231 })?;
232 }
233
234 fs::write(&command_path, request.command).with_context(|| {
235 format!(
236 "failed to write command contents to `{path}`",
237 path = command_path.display()
238 )
239 })?;
240
241 let inputs_url = Arc::new(
244 backend_config
245 .inputs
246 .clone()
247 .expect("should have inputs URL"),
248 );
249
250 let mut backend_inputs = vec![
252 Input::builder()
253 .path(GUEST_COMMAND_PATH)
254 .contents(Contents::Path(command_path.to_path_buf()))
255 .ty(InputType::File)
256 .read_only(true)
257 .build(),
258 ];
259
260 let mut uploads = JoinSet::new();
263 for (i, input) in request.backend_inputs.iter().enumerate() {
264 match input.path().kind() {
265 EvaluationPathKind::Local(path) => {
266 let kind = input.kind();
268 let path = path.to_path_buf();
269 let transferer = transferer.clone();
270 let inputs_url = inputs_url.clone();
271 uploads.spawn(async move {
272 let url = inputs_url.join_digest(
273 calculate_local_digest(&path, kind, ContentDigestMode::Strong)
274 .await
275 .with_context(|| {
276 format!(
277 "failed to calculate digest of `{path}`",
278 path = path.display()
279 )
280 })?,
281 );
282 transferer
283 .upload(&path, &url)
284 .await
285 .with_context(|| {
286 format!(
287 "failed to upload `{path}` to `{url}`",
288 path = path.display(),
289 url = url.display()
290 )
291 })
292 .map(|_| (i, url))
293 });
294 }
295 EvaluationPathKind::Remote(url) => {
296 backend_inputs.push(
298 Input::builder()
299 .path(
300 input
301 .guest_path()
302 .expect("input should have guest path")
303 .as_str(),
304 )
305 .contents(Contents::Url(url.clone()))
306 .ty(input.kind())
307 .read_only(true)
308 .build(),
309 );
310 }
311 }
312 }
313
314 while let Some(result) = uploads.join_next().await {
316 let (i, url) = result.context("upload task")??;
317 let input = &request.backend_inputs[i];
318 backend_inputs.push(
319 Input::builder()
320 .path(
321 input
322 .guest_path()
323 .expect("input should have guest path")
324 .as_str(),
325 )
326 .contents(Contents::Url(url))
327 .ty(input.kind())
328 .read_only(true)
329 .build(),
330 );
331 }
332
333 let output_dir = format!(
334 "{name}-{timestamp}/",
335 timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S")
336 );
337
338 let outputs_url = backend_config
341 .outputs
342 .as_ref()
343 .expect("should have outputs URL")
344 .join(&output_dir)
345 .expect("should join");
346
347 let mut work_dir_url = outputs_url.join(WORK_DIR_NAME).expect("should join");
348 let stdout_url = outputs_url.join(STDOUT_FILE_NAME).expect("should join");
349 let stderr_url = outputs_url.join(STDERR_FILE_NAME).expect("should join");
350
351 let outputs = vec![
354 Output::builder()
355 .path(GUEST_WORK_DIR)
356 .url(work_dir_url.clone())
357 .ty(OutputType::Directory)
358 .build(),
359 Output::builder()
360 .path(GUEST_STDOUT_PATH)
361 .url(stdout_url.clone())
362 .ty(OutputType::File)
363 .build(),
364 Output::builder()
365 .path(GUEST_STDERR_PATH)
366 .url(stderr_url.clone())
367 .ty(OutputType::File)
368 .build(),
369 ];
370
371 let disks = &request.constraints.disks;
375 let disk: f64 = if disks.is_empty() {
376 DEFAULT_TASK_REQUIREMENT_DISKS
377 } else {
378 let sum: f64 = disks.values().map(|size| *size as f64).sum();
379 if disks.contains_key(DEFAULT_DISK_MOUNT_POINT) {
380 sum
381 } else {
382 sum + DEFAULT_TASK_REQUIREMENT_DISKS
383 }
384 };
385
386 let volumes = request
387 .constraints
388 .disks
389 .keys()
390 .filter_map(|mp| {
391 if mp == DEFAULT_DISK_MOUNT_POINT {
395 None
396 } else {
397 Some(mp.clone())
398 }
399 })
400 .collect::<Vec<_>>();
401
402 if !volumes.is_empty() {
403 debug!(
404 "disk size constraints cannot be enforced by the Docker backend; mount points \
405 will be created but sizes will not be limited"
406 );
407 }
408
409 let mut preemptible = preemptible;
410 loop {
411 let task = Task::builder()
412 .name(&name)
413 .executions(NonEmpty::new(
414 Execution::builder()
415 .image(
416 match request
417 .constraints
418 .container
419 .as_ref()
420 .expect("constraints should have a container")
421 {
422 ContainerSource::Docker(s) => s.clone(),
424 c => format!("{c:#}"),
425 },
426 )
427 .program(
428 self.config
429 .task
430 .shell
431 .as_deref()
432 .unwrap_or(DEFAULT_TASK_SHELL),
433 )
434 .args([GUEST_COMMAND_PATH.to_string()])
435 .work_dir(GUEST_WORK_DIR)
436 .env(request.env.clone())
437 .stdout(GUEST_STDOUT_PATH)
438 .stderr(GUEST_STDERR_PATH)
439 .build(),
440 ))
441 .inputs(backend_inputs.clone())
442 .outputs(outputs.clone())
443 .resources(
444 Resources::builder()
445 .cpu(request.constraints.cpu)
446 .maybe_cpu_limit(hints::max_cpu(request.inputs, request.hints))
447 .ram(request.constraints.memory as f64 / ONE_GIBIBYTE)
448 .disk(disk)
449 .maybe_ram_limit(max_memory)
450 .preemptible(preemptible > 0)
451 .build(),
452 )
453 .volumes(volumes.clone())
454 .build();
455
456 let statuses = match self.inner.run(task, self.cancellation.second())?.await {
457 Ok(statuses) => statuses,
458 Err(TaskRunError::Preempted) if preemptible > 0 => {
459 preemptible -= 1;
461 continue;
462 }
463 Err(TaskRunError::Canceled) => return Ok(None),
464 Err(e) => return Err(e.into()),
465 };
466
467 assert_eq!(statuses.len(), 1, "there should only be one output");
468 let status = statuses.first();
469
470 work_dir_url.path_segments_mut().unwrap().push("");
473
474 return Ok(Some(TaskExecutionResult {
475 exit_code: status.code().expect("should have exit code"),
476 work_dir: EvaluationPath::try_from(work_dir_url)?,
477 stdout: PrimitiveValue::new_file(stdout_url).into(),
478 stderr: PrimitiveValue::new_file(stderr_url).into(),
479 }));
480 }
481 }
482 .boxed()
483 }
484}