1#![allow(clippy::missing_docs_in_private_items)]
2
3use std::collections::HashMap;
16use std::fmt::Write as _;
17use std::path::PathBuf;
18use std::process::Stdio;
19use std::sync::Arc;
20
21use anyhow::Context as _;
22use anyhow::anyhow;
23use anyhow::bail;
24use crankshaft::events::Event;
25use images::sif_for_container;
26use nonempty::NonEmpty;
27use tokio::fs::File;
28use tokio::fs::{self};
29use tokio::io::AsyncBufReadExt;
30use tokio::io::BufReader;
31use tokio::process::Command;
32use tokio::sync::broadcast;
33use tokio_util::sync::CancellationToken;
34use tracing::error;
35use tracing::trace;
36use tracing::warn;
37
38use super::COMMAND_FILE_NAME;
39use super::TaskExecutionBackend;
40use super::TaskManager;
41use super::TaskManagerRequest;
42use super::TaskSpawnRequest;
43use super::WORK_DIR_NAME;
44use crate::PrimitiveValue;
45use crate::STDERR_FILE_NAME;
46use crate::STDOUT_FILE_NAME;
47use crate::TaskExecutionResult;
48use crate::Value;
49use crate::config::Config;
50use crate::path::EvaluationPath;
51use crate::v1;
52
53mod images;
54
55const APPTAINER_COMMAND_FILE_NAME: &str = "apptainer_command";
57
58const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
60
61const GUEST_WORK_DIR: &str = "/mnt/task/work";
63
64const GUEST_COMMAND_PATH: &str = "/mnt/task/command";
66
67const GUEST_STDOUT_PATH: &str = "/mnt/task/stdout";
69
70const GUEST_STDERR_PATH: &str = "/mnt/task/stderr";
72
73const LSF_JOB_NAME_MAX_LENGTH: usize = 4094;
77
78#[derive(Debug)]
79struct LsfApptainerTaskRequest {
80 backend_config: Arc<LsfApptainerBackendConfig>,
81 name: String,
82 spawn_request: TaskSpawnRequest,
83 container: String,
85 cpu: f64,
87 memory: u64,
89 crankshaft_events: Option<broadcast::Sender<Event>>,
96 cancellation_token: CancellationToken,
97}
98
99impl TaskManagerRequest for LsfApptainerTaskRequest {
100 fn cpu(&self) -> f64 {
101 self.cpu
102 }
103
104 fn memory(&self) -> u64 {
105 self.memory
106 }
107
108 async fn run(self) -> anyhow::Result<super::TaskExecutionResult> {
109 let crankshaft_task_id = crankshaft::events::next_task_id();
110
111 let container_sif = sif_for_container(
112 &self.backend_config,
113 &self.container,
114 self.cancellation_token.clone(),
115 )
116 .await?;
117
118 let attempt_dir = self.spawn_request.attempt_dir();
119
120 let wdl_work_dir = attempt_dir.join(WORK_DIR_NAME);
122 fs::create_dir_all(&wdl_work_dir).await.with_context(|| {
123 format!(
124 "failed to create WDL working directory `{path}`",
125 path = wdl_work_dir.display()
126 )
127 })?;
128
129 let wdl_command_path = attempt_dir.join(COMMAND_FILE_NAME);
131 fs::write(&wdl_command_path, self.spawn_request.command())
132 .await
133 .with_context(|| {
134 format!(
135 "failed to write WDL command contents to `{path}`",
136 path = wdl_command_path.display()
137 )
138 })?;
139 #[cfg(unix)]
140 tokio::fs::set_permissions(
141 &wdl_command_path,
142 <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
143 )
144 .await?;
145
146 let wdl_stdout_path = attempt_dir.join(STDOUT_FILE_NAME);
148 let _ = File::create(&wdl_stdout_path).await.with_context(|| {
149 format!(
150 "failed to create WDL stdout file `{path}`",
151 path = wdl_stdout_path.display()
152 )
153 })?;
154
155 let wdl_stderr_path = attempt_dir.join(STDERR_FILE_NAME);
157 let _ = File::create(&wdl_stderr_path).await.with_context(|| {
158 format!(
159 "failed to create WDL stderr file `{path}`",
160 path = wdl_stderr_path.display()
161 )
162 })?;
163
164 let container_tmp_path = self
172 .spawn_request
173 .temp_dir()
174 .join("container_tmp")
175 .to_path_buf();
176 tokio::fs::DirBuilder::new()
177 .recursive(true)
178 .create(&container_tmp_path)
179 .await
180 .with_context(|| {
181 format!(
182 "failed to create container /tmp directory at `{path}`",
183 path = container_tmp_path.display()
184 )
185 })?;
186 let container_var_tmp_path = self
187 .spawn_request
188 .temp_dir()
189 .join("container_var_tmp")
190 .to_path_buf();
191 tokio::fs::DirBuilder::new()
192 .recursive(true)
193 .create(&container_var_tmp_path)
194 .await
195 .with_context(|| {
196 format!(
197 "failed to create container /var/tmp directory at `{path}`",
198 path = container_var_tmp_path.display()
199 )
200 })?;
201
202 let apptainer_command_path = attempt_dir.join(APPTAINER_COMMAND_FILE_NAME);
205 let mut apptainer_command = String::new();
206 writeln!(&mut apptainer_command, "#!/bin/env bash")?;
207
208 for (k, v) in self.spawn_request.env().iter() {
213 writeln!(&mut apptainer_command, "export APPTAINERENV_{k}={v}")?;
214 }
215
216 write!(&mut apptainer_command, "apptainer -v exec ")?;
220 write!(&mut apptainer_command, "--cwd {GUEST_WORK_DIR} ")?;
221 write!(&mut apptainer_command, "--containall --cleanenv ")?;
225
226 for input in self.spawn_request.inputs() {
227 write!(
228 &mut apptainer_command,
229 "--mount type=bind,src={host_path},dst={guest_path},ro ",
230 host_path = input
231 .local_path()
232 .ok_or_else(|| anyhow!("input not localized: {input:?}"))?
233 .display(),
234 guest_path = input
235 .guest_path()
236 .ok_or_else(|| anyhow!("guest path missing: {input:?}"))?,
237 )?;
238 }
239
240 write!(
242 &mut apptainer_command,
243 "--mount type=bind,src={},dst={GUEST_COMMAND_PATH},ro ",
244 wdl_command_path.display()
245 )?;
246 write!(
249 &mut apptainer_command,
250 "--mount type=bind,src={},dst={GUEST_WORK_DIR} ",
251 wdl_work_dir.display()
252 )?;
253 write!(
254 &mut apptainer_command,
255 "--mount type=bind,src={},dst=/tmp ",
256 container_tmp_path.display()
257 )?;
258 write!(
259 &mut apptainer_command,
260 "--mount type=bind,src={},dst=/var/tmp ",
261 container_var_tmp_path.display()
262 )?;
263 write!(
264 &mut apptainer_command,
265 "--mount type=bind,src={},dst={GUEST_STDOUT_PATH} ",
266 wdl_stdout_path.display()
267 )?;
268 write!(
269 &mut apptainer_command,
270 "--mount type=bind,src={},dst={GUEST_STDERR_PATH} ",
271 wdl_stderr_path.display()
272 )?;
273 if let Some(true) = self
275 .spawn_request
276 .requirements()
277 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
278 .and_then(Value::as_boolean)
279 {
280 write!(&mut apptainer_command, "--nv ")?;
281 }
282
283 if let Some(args) = &self.backend_config.extra_apptainer_exec_args {
285 for arg in args {
286 write!(&mut apptainer_command, "{arg} ")?;
287 }
288 }
289 write!(&mut apptainer_command, "{} ", container_sif.display())?;
291 write!(
294 &mut apptainer_command,
295 "bash -c \"{GUEST_COMMAND_PATH} > {GUEST_STDOUT_PATH} 2> {GUEST_STDERR_PATH}\" "
296 )?;
297 let apptainer_stdout_path = attempt_dir.join("apptainer.stdout");
299 let apptainer_stderr_path = attempt_dir.join("apptainer.stderr");
300 writeln!(
303 &mut apptainer_command,
304 "> {stdout} 2> {stderr}",
305 stdout = apptainer_stdout_path.display(),
306 stderr = apptainer_stderr_path.display()
307 )?;
308
309 fs::write(&apptainer_command_path, apptainer_command)
310 .await
311 .with_context(|| {
312 format!(
313 "failed to write Apptainer command file `{}`",
314 apptainer_command_path.display()
315 )
316 })?;
317 #[cfg(unix)]
318 tokio::fs::set_permissions(
319 &apptainer_command_path,
320 <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
321 )
322 .await?;
323
324 let lsf_stdout_path = attempt_dir.join("lsf.stdout");
327 let lsf_stderr_path = attempt_dir.join("lsf.stderr");
328
329 let mut bsub_command = Command::new("bsub");
330
331 if let Some(queue) = self.backend_config.lsf_queue_for_task(
334 self.spawn_request.requirements(),
335 self.spawn_request.hints(),
336 ) {
337 bsub_command.arg("-q").arg(queue);
338 }
339
340 if let Some(true) = self
344 .spawn_request
345 .requirements()
346 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
347 .and_then(Value::as_boolean)
348 {
349 match self.spawn_request.hints().get(wdl_ast::v1::TASK_HINT_GPU) {
350 Some(Value::Primitive(PrimitiveValue::Integer(n))) => {
351 bsub_command.arg("-gpu").arg(format!("num={n}/host"));
352 }
353 Some(Value::Primitive(PrimitiveValue::String(hint))) => {
354 warn!(
355 %hint,
356 "string hints for GPU are not supported; falling back to 1 GPU per host"
357 );
358 bsub_command.arg("-gpu").arg("num=1/host");
359 }
360 _ => {
363 bsub_command.arg("-gpu").arg("num=1/host");
364 }
365 }
366 }
367
368 if let Some(args) = &self.backend_config.extra_bsub_args {
370 bsub_command.args(args);
371 }
372
373 bsub_command
374 .stdout(Stdio::piped())
377 .stderr(Stdio::piped())
378 .env("LSB_JOB_REPORT_MAIL", "N")
381 .arg("-K")
388 .arg("-J")
391 .arg(&self.name)
392 .arg("-oo")
396 .arg(lsf_stdout_path)
397 .arg("-eo")
398 .arg(lsf_stderr_path)
399 .arg("-R")
401 .arg(format!(
402 "affinity[cpu({cpu})]",
403 cpu = self.cpu.ceil() as u64
404 ))
405 .arg("-R")
409 .arg(format!(
410 "rusage[mem={memory_kb}KB/job]",
411 memory_kb = self.memory / 1024
412 ))
413 .arg(apptainer_command_path);
414
415 let mut bsub_child = bsub_command.spawn()?;
416
417 crankshaft::events::send_event!(
418 self.crankshaft_events,
419 crankshaft::events::Event::TaskCreated {
420 id: crankshaft_task_id,
421 name: self.name.clone(),
422 tes_id: None,
423 token: self.cancellation_token.clone(),
424 },
425 );
426
427 let bsub_stdout = bsub_child
435 .stdout
436 .take()
437 .ok_or_else(|| anyhow!("bsub child stdout missing"))?;
438 let task_name = self.name.clone();
439 tokio::spawn(async move {
440 let mut lines = BufReader::new(bsub_stdout).lines();
441 while let Ok(Some(line)) = lines.next_line().await {
442 trace!(stdout = line, task_name);
443 }
444 });
445 let bsub_stderr = bsub_child
446 .stderr
447 .take()
448 .ok_or_else(|| anyhow!("bsub child stderr missing"))?;
449 let task_name = self.name.clone();
450 let stderr_crankshaft_events = self.crankshaft_events.clone();
451 tokio::spawn(async move {
452 let mut lines = BufReader::new(bsub_stderr).lines();
453 while let Ok(Some(line)) = lines.next_line().await {
454 if line.starts_with("<<Starting") {
455 crankshaft::events::send_event!(
456 stderr_crankshaft_events,
457 crankshaft::events::Event::TaskStarted {
458 id: crankshaft_task_id
459 },
460 );
461 }
462 trace!(stderr = line, task_name);
463 }
464 });
465
466 let bsub_result = tokio::select! {
469 _ = self.cancellation_token.cancelled() => {
470 crankshaft::events::send_event!(
471 self.crankshaft_events,
472 crankshaft::events::Event::TaskCanceled {
473 id: crankshaft_task_id
474 },
475 );
476 Err(anyhow!("task execution cancelled"))
477 }
478 result = bsub_child.wait() => result.map_err(Into::into),
479 }?;
480
481 crankshaft::events::send_event!(
482 self.crankshaft_events,
483 crankshaft::events::Event::TaskCompleted {
484 id: crankshaft_task_id,
485 exit_statuses: NonEmpty::new(bsub_result),
486 }
487 );
488
489 Ok(TaskExecutionResult {
490 exit_code: bsub_result
496 .code()
497 .ok_or(anyhow!("task did not return an exit code"))?,
498 work_dir: EvaluationPath::Local(wdl_work_dir),
499 stdout: PrimitiveValue::new_file(
500 wdl_stdout_path
501 .into_os_string()
502 .into_string()
503 .expect("path should be UTF-8"),
504 )
505 .into(),
506 stderr: PrimitiveValue::new_file(
507 wdl_stderr_path
508 .into_os_string()
509 .into_string()
510 .expect("path should be UTF-8"),
511 )
512 .into(),
513 })
514 }
515}
516
517#[derive(Debug)]
521pub struct LsfApptainerBackend {
522 engine_config: Arc<Config>,
523 backend_config: Arc<LsfApptainerBackendConfig>,
524 manager: TaskManager<LsfApptainerTaskRequest>,
525 crankshaft_events: Option<broadcast::Sender<Event>>,
526}
527
528impl LsfApptainerBackend {
529 pub fn new(
531 engine_config: Arc<Config>,
532 backend_config: Arc<LsfApptainerBackendConfig>,
533 crankshaft_events: Option<broadcast::Sender<Event>>,
534 ) -> Self {
535 Self {
536 engine_config,
537 backend_config,
538 manager: TaskManager::new_unlimited(u64::MAX, u64::MAX),
543 crankshaft_events,
544 }
545 }
546}
547
548impl TaskExecutionBackend for LsfApptainerBackend {
549 fn max_concurrency(&self) -> u64 {
550 self.backend_config.max_scatter_concurrency
551 }
552
553 fn constraints(
554 &self,
555 requirements: &std::collections::HashMap<String, crate::Value>,
556 _hints: &std::collections::HashMap<String, crate::Value>,
557 ) -> anyhow::Result<super::TaskExecutionConstraints> {
558 Ok(super::TaskExecutionConstraints {
559 container: Some(
560 v1::container(requirements, self.engine_config.task.container.as_deref())
561 .into_owned(),
562 ),
563 cpu: f64::MAX,
569 memory: i64::MAX,
570 gpu: Default::default(),
571 fpga: Default::default(),
572 disks: Default::default(),
573 })
574 }
575
576 fn guest_inputs_dir(&self) -> Option<&'static str> {
577 Some(GUEST_INPUTS_DIR)
578 }
579
580 fn needs_local_inputs(&self) -> bool {
581 true
582 }
583
584 fn spawn(
585 &self,
586 request: TaskSpawnRequest,
587 cancellation_token: CancellationToken,
588 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<TaskExecutionResult>>> {
589 let (completed_tx, completed_rx) = tokio::sync::oneshot::channel();
590
591 let requirements = request.requirements();
592 let hints = request.hints();
593
594 let container =
595 v1::container(requirements, self.engine_config.task.container.as_deref()).into_owned();
596 let cpu = v1::cpu(requirements);
597 let memory = v1::memory(requirements)? as u64;
598 let _max_cpu = v1::max_cpu(hints);
603 let _max_memory = v1::max_memory(hints)?.map(|i| i as u64);
605
606 let request_id = request.id();
608 let name = if request_id.len() > LSF_JOB_NAME_MAX_LENGTH {
609 request_id
610 .chars()
611 .take(LSF_JOB_NAME_MAX_LENGTH)
612 .collect::<String>()
613 } else {
614 request_id.to_string()
615 };
616
617 self.manager.send(
618 LsfApptainerTaskRequest {
619 backend_config: self.backend_config.clone(),
620 spawn_request: request,
621 name,
622 container,
623 cpu,
624 memory,
625 crankshaft_events: self.crankshaft_events.clone(),
626 cancellation_token,
627 },
628 completed_tx,
629 );
630
631 Ok(completed_rx)
632 }
633
634 fn cleanup<'a>(
635 &'a self,
636 _work_dir: &'a EvaluationPath,
637 _token: CancellationToken,
638 ) -> Option<futures::future::BoxFuture<'a, ()>> {
639 None
644 }
645}
646
647#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
653pub struct LsfApptainerBackendConfig {
654 pub default_lsf_queue: Option<String>,
661 pub short_task_lsf_queue: Option<String>,
668 pub gpu_lsf_queue: Option<String>,
672 pub fpga_lsf_queue: Option<String>,
676 pub extra_bsub_args: Option<Vec<String>>,
679 #[serde(default = "default_max_scatter_concurrency")]
684 pub max_scatter_concurrency: u64,
685 pub extra_apptainer_exec_args: Option<Vec<String>>,
688 #[serde(default = "default_apptainer_images_dir")]
698 pub apptainer_images_dir: PathBuf,
699}
700
701fn default_max_scatter_concurrency() -> u64 {
702 200
703}
704
705fn default_apptainer_images_dir() -> PathBuf {
706 if let Some(cache) = dirs::cache_dir() {
707 cache.join("sprocket-apptainer-images").to_path_buf()
708 } else {
709 std::env::temp_dir()
710 .join("sprocket-apptainer-images")
711 .to_path_buf()
712 }
713}
714
715impl Default for LsfApptainerBackendConfig {
716 fn default() -> Self {
717 Self {
718 default_lsf_queue: None,
719 short_task_lsf_queue: None,
720 gpu_lsf_queue: None,
721 fpga_lsf_queue: None,
722 extra_bsub_args: None,
723 max_scatter_concurrency: default_max_scatter_concurrency(),
724 apptainer_images_dir: default_apptainer_images_dir(),
725 extra_apptainer_exec_args: None,
726 }
727 }
728}
729
730impl LsfApptainerBackendConfig {
731 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
733 if cfg!(not(unix)) {
734 bail!("LSF + Apptainer backend is not supported on non-unix platforms");
735 }
736 if !engine_config.experimental_features_enabled {
737 bail!("LSF + Apptainer backend requires enabling experimental features");
738 }
739
740 if let Some(queue) = &self.default_lsf_queue {
746 validate_lsf_queue("default", queue).await?;
747 }
748 if let Some(queue) = &self.short_task_lsf_queue {
749 validate_lsf_queue("short_task", queue).await?;
750 }
751 if let Some(queue) = &self.gpu_lsf_queue {
752 validate_lsf_queue("gpu", queue).await?;
753 }
754 if let Some(queue) = &self.fpga_lsf_queue {
755 validate_lsf_queue("fpga", queue).await?;
756 }
757 Ok(())
758 }
759
760 fn lsf_queue_for_task(
765 &self,
766 requirements: &HashMap<String, Value>,
767 hints: &HashMap<String, Value>,
768 ) -> Option<&str> {
769 if let Some(queue) = self.fpga_lsf_queue.as_deref()
775 && let Some(true) = requirements
776 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
777 .and_then(Value::as_boolean)
778 {
779 return Some(queue);
780 }
781
782 if let Some(queue) = self.gpu_lsf_queue.as_deref()
783 && let Some(true) = requirements
784 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
785 .and_then(Value::as_boolean)
786 {
787 return Some(queue);
788 }
789
790 if let Some(queue) = self.short_task_lsf_queue.as_deref()
792 && let Some(true) = hints
793 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
794 .and_then(Value::as_boolean)
795 {
796 return Some(queue);
797 }
798
799 self.default_lsf_queue.as_deref()
802 }
803}
804
805async fn validate_lsf_queue(name: &str, queue: &str) -> Result<(), anyhow::Error> {
806 match tokio::time::timeout(
807 std::time::Duration::from_secs(10),
810 Command::new("bqueues").arg(queue).output(),
811 )
812 .await
813 {
814 Ok(output) => {
815 let output = output.context("validating LSF queue")?;
816 if !output.status.success() {
817 let stdout = String::from_utf8_lossy(&output.stdout);
818 let stderr = String::from_utf8_lossy(&output.stderr);
819 error!(%stdout, %stderr, %queue, "failed to validate {name}_lsf_queue");
820 Err(anyhow!("failed to validate {name}_lsf_queue `{queue}`"))
821 } else {
822 Ok(())
823 }
824 }
825 Err(_) => Err(anyhow!(
826 "timed out trying to validate {name}_lsf_queue `{queue}`"
827 )),
828 }
829}