1use std::collections::HashMap;
14use std::path::Path;
15use std::process::Stdio;
16use std::sync::Arc;
17
18use anyhow::Context as _;
19use anyhow::anyhow;
20use anyhow::bail;
21use anyhow::ensure;
22use bytesize::ByteSize;
23use crankshaft::events::Event;
24use nonempty::NonEmpty;
25use tokio::fs::File;
26use tokio::fs::{self};
27use tokio::io::AsyncBufReadExt;
28use tokio::io::BufReader;
29use tokio::process::Command;
30use tokio::sync::broadcast;
31use tokio_util::sync::CancellationToken;
32use tracing::debug;
33use tracing::error;
34use tracing::trace;
35use tracing::warn;
36
37use super::ApptainerState;
38use super::TaskExecutionBackend;
39use super::TaskManager;
40use super::TaskManagerRequest;
41use super::TaskSpawnRequest;
42use super::apptainer::ApptainerConfig;
43use crate::ONE_GIBIBYTE;
44use crate::PrimitiveValue;
45use crate::TaskExecutionResult;
46use crate::Value;
47use crate::config::Config;
48use crate::config::TaskResourceLimitBehavior;
49use crate::path::EvaluationPath;
50use crate::v1;
51
52const APPTAINER_COMMAND_FILE_NAME: &str = "apptainer_command";
54
55const GUEST_INPUTS_DIR: &str = "/mnt/task/inputs/";
57
58const LSF_JOB_NAME_MAX_LENGTH: usize = 4094;
62
63#[derive(Debug)]
65struct LsfApptainerTaskRequest {
66 backend_config: Arc<LsfApptainerBackendConfig>,
68 apptainer_state: Arc<ApptainerState>,
70 name: String,
73 spawn_request: TaskSpawnRequest,
75 container: String,
77 required_cpu: f64,
79 required_memory: ByteSize,
81 crankshaft_events: Option<broadcast::Sender<Event>>,
88 cancellation_token: CancellationToken,
90}
91
92impl TaskManagerRequest for LsfApptainerTaskRequest {
93 fn cpu(&self) -> f64 {
94 self.required_cpu
95 }
96
97 fn memory(&self) -> u64 {
98 self.required_memory.as_u64()
99 }
100
101 async fn run(self) -> anyhow::Result<super::TaskExecutionResult> {
102 let crankshaft_task_id = crankshaft::events::next_task_id();
103
104 let attempt_dir = self.spawn_request.attempt_dir();
105
106 let wdl_work_dir = self.spawn_request.wdl_work_dir_host_path();
108 fs::create_dir_all(&wdl_work_dir).await.with_context(|| {
109 format!(
110 "failed to create WDL working directory `{path}`",
111 path = wdl_work_dir.display()
112 )
113 })?;
114
115 let wdl_stdout_path = self.spawn_request.wdl_stdout_host_path();
117 let _ = File::create(&wdl_stdout_path).await.with_context(|| {
118 format!(
119 "failed to create WDL stdout file `{path}`",
120 path = wdl_stdout_path.display()
121 )
122 })?;
123
124 let wdl_stderr_path = self.spawn_request.wdl_stderr_host_path();
126 let _ = File::create(&wdl_stderr_path).await.with_context(|| {
127 format!(
128 "failed to create WDL stderr file `{path}`",
129 path = wdl_stderr_path.display()
130 )
131 })?;
132
133 let wdl_command_path = self.spawn_request.wdl_command_host_path();
135 fs::write(&wdl_command_path, self.spawn_request.command())
136 .await
137 .with_context(|| {
138 format!(
139 "failed to write WDL command contents to `{path}`",
140 path = wdl_command_path.display()
141 )
142 })?;
143 #[cfg(unix)]
144 fs::set_permissions(
145 &wdl_command_path,
146 <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
147 )
148 .await?;
149
150 let apptainer_command = self
151 .apptainer_state
152 .prepare_apptainer_command(
153 &self.container,
154 self.cancellation_token.clone(),
155 &self.spawn_request,
156 )
157 .await?;
158
159 let apptainer_command_path = attempt_dir.join(APPTAINER_COMMAND_FILE_NAME);
160 fs::write(&apptainer_command_path, apptainer_command)
161 .await
162 .with_context(|| {
163 format!(
164 "failed to write Apptainer command file `{}`",
165 apptainer_command_path.display()
166 )
167 })?;
168 #[cfg(unix)]
169 tokio::fs::set_permissions(
170 &apptainer_command_path,
171 <std::fs::Permissions as std::os::unix::fs::PermissionsExt>::from_mode(0o770),
172 )
173 .await?;
174
175 let lsf_stdout_path = attempt_dir.join("lsf.stdout");
178 let lsf_stderr_path = attempt_dir.join("lsf.stderr");
179
180 let mut bsub_command = Command::new("bsub");
181
182 if let Some(queue) = self.backend_config.lsf_queue_for_task(
185 self.spawn_request.requirements(),
186 self.spawn_request.hints(),
187 ) {
188 bsub_command.arg("-q").arg(queue.name());
189 }
190
191 if let Some(n_gpu) = v1::gpu(
193 self.spawn_request.requirements(),
194 self.spawn_request.hints(),
195 ) {
196 bsub_command.arg("-gpu").arg(format!("num={n_gpu}/host"));
197 }
198
199 if let Some(args) = &self.backend_config.extra_bsub_args {
201 bsub_command.args(args);
202 }
203
204 bsub_command
205 .stdout(Stdio::piped())
208 .stderr(Stdio::piped())
209 .env("LSB_JOB_REPORT_MAIL", "N")
212 .arg("-K")
219 .arg("-J")
222 .arg(&self.name)
223 .arg("-oo")
227 .arg(lsf_stdout_path)
228 .arg("-eo")
229 .arg(lsf_stderr_path)
230 .arg("-R")
232 .arg(format!(
233 "affinity[cpu({cpu})]",
234 cpu = self.required_cpu.ceil() as u64
235 ))
236 .arg("-R")
240 .arg(format!(
241 "rusage[mem={memory_kb}KB/job]",
242 memory_kb = self.required_memory.as_u64() / bytesize::KIB,
243 ))
244 .arg(apptainer_command_path);
245
246 debug!(?bsub_command, "spawning `bsub` command");
247
248 let mut bsub_child = bsub_command.spawn()?;
249
250 crankshaft::events::send_event!(
251 self.crankshaft_events,
252 crankshaft::events::Event::TaskCreated {
253 id: crankshaft_task_id,
254 name: self.name.clone(),
255 tes_id: None,
256 token: self.cancellation_token.clone(),
257 },
258 );
259
260 let bsub_stdout = bsub_child
268 .stdout
269 .take()
270 .ok_or_else(|| anyhow!("bsub child stdout missing"))?;
271 let task_name = self.name.clone();
272 tokio::spawn(async move {
273 let mut lines = BufReader::new(bsub_stdout).lines();
274 while let Ok(Some(line)) = lines.next_line().await {
275 trace!(stdout = line, task_name);
276 }
277 });
278 let bsub_stderr = bsub_child
279 .stderr
280 .take()
281 .ok_or_else(|| anyhow!("bsub child stderr missing"))?;
282 let task_name = self.name.clone();
283 let stderr_crankshaft_events = self.crankshaft_events.clone();
284 tokio::spawn(async move {
285 let mut lines = BufReader::new(bsub_stderr).lines();
286 while let Ok(Some(line)) = lines.next_line().await {
287 if line.starts_with("<<Starting") {
288 crankshaft::events::send_event!(
289 stderr_crankshaft_events,
290 crankshaft::events::Event::TaskStarted {
291 id: crankshaft_task_id
292 },
293 );
294 }
295 trace!(stderr = line, task_name);
296 }
297 });
298
299 let bsub_result = tokio::select! {
302 _ = self.cancellation_token.cancelled() => {
303 crankshaft::events::send_event!(
304 self.crankshaft_events,
305 crankshaft::events::Event::TaskCanceled {
306 id: crankshaft_task_id
307 },
308 );
309 Err(anyhow!("task execution cancelled"))
310 }
311 result = bsub_child.wait() => result.map_err(Into::into),
312 }?;
313
314 crankshaft::events::send_event!(
315 self.crankshaft_events,
316 crankshaft::events::Event::TaskCompleted {
317 id: crankshaft_task_id,
318 exit_statuses: NonEmpty::new(bsub_result),
319 }
320 );
321
322 Ok(TaskExecutionResult {
323 exit_code: bsub_result
329 .code()
330 .ok_or(anyhow!("task did not return an exit code"))?,
331 work_dir: EvaluationPath::Local(wdl_work_dir),
332 stdout: PrimitiveValue::new_file(
333 wdl_stdout_path
334 .into_os_string()
335 .into_string()
336 .expect("path should be UTF-8"),
337 )
338 .into(),
339 stderr: PrimitiveValue::new_file(
340 wdl_stderr_path
341 .into_os_string()
342 .into_string()
343 .expect("path should be UTF-8"),
344 )
345 .into(),
346 })
347 }
348}
349
350#[derive(Debug)]
354pub struct LsfApptainerBackend {
355 engine_config: Arc<Config>,
357 backend_config: Arc<LsfApptainerBackendConfig>,
359 manager: TaskManager<LsfApptainerTaskRequest>,
361 crankshaft_events: Option<broadcast::Sender<Event>>,
363 apptainer_state: Arc<ApptainerState>,
365}
366
367impl LsfApptainerBackend {
368 pub fn new(
375 run_root_dir: &Path,
376 engine_config: Arc<Config>,
377 backend_config: Arc<LsfApptainerBackendConfig>,
378 crankshaft_events: Option<broadcast::Sender<Event>>,
379 ) -> Self {
380 let apptainer_state =
381 ApptainerState::new(&backend_config.apptainer_config, run_root_dir).into();
382 Self {
383 engine_config,
384 backend_config,
385 manager: TaskManager::new_unlimited(u64::MAX, u64::MAX),
390 crankshaft_events,
391 apptainer_state,
392 }
393 }
394}
395
396impl TaskExecutionBackend for LsfApptainerBackend {
397 fn max_concurrency(&self) -> u64 {
398 self.backend_config.max_scatter_concurrency
399 }
400
401 fn constraints(
402 &self,
403 requirements: &HashMap<String, Value>,
404 hints: &HashMap<String, Value>,
405 ) -> anyhow::Result<super::TaskExecutionConstraints> {
406 let mut required_cpu = v1::cpu(requirements);
407 let mut required_memory = ByteSize::b(v1::memory(requirements)? as u64);
408
409 if let Some(queue) = self.backend_config.lsf_queue_for_task(requirements, hints) {
416 if let Some(max_cpu) = queue.max_cpu_per_task()
417 && required_cpu > max_cpu as f64
418 {
419 let env_specific = if self.engine_config.suppress_env_specific_output {
420 String::new()
421 } else {
422 format!(", but the execution backend has a maximum of {max_cpu}",)
423 };
424 match self.engine_config.task.cpu_limit_behavior {
425 TaskResourceLimitBehavior::TryWithMax => {
426 warn!(
427 "task requires at least {required_cpu} CPU{s}{env_specific}",
428 s = if required_cpu == 1.0 { "" } else { "s" },
429 );
430 required_cpu = max_cpu as f64;
432 }
433 TaskResourceLimitBehavior::Deny => {
434 bail!(
435 "task requires at least {required_cpu} CPU{s}{env_specific}",
436 s = if required_cpu == 1.0 { "" } else { "s" },
437 );
438 }
439 }
440 }
441 if let Some(max_memory) = queue.max_memory_per_task()
442 && required_memory > max_memory
443 {
444 let env_specific = if self.engine_config.suppress_env_specific_output {
445 String::new()
446 } else {
447 format!(
448 ", but the execution backend has a maximum of {max_memory} GiB",
449 max_memory = max_memory.as_u64() as f64 / ONE_GIBIBYTE
450 )
451 };
452 match self.engine_config.task.memory_limit_behavior {
453 TaskResourceLimitBehavior::TryWithMax => {
454 warn!(
455 "task requires at least {required_memory} GiB of memory{env_specific}",
456 required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
457 );
458 required_memory = max_memory;
460 }
461 TaskResourceLimitBehavior::Deny => {
462 bail!(
463 "task requires at least {required_memory} GiB of memory{env_specific}",
464 required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
465 );
466 }
467 }
468 }
469 }
470 Ok(super::TaskExecutionConstraints {
471 container: Some(
472 v1::container(requirements, self.engine_config.task.container.as_deref())
473 .into_owned(),
474 ),
475 cpu: required_cpu,
476 memory: required_memory.as_u64().try_into().unwrap_or(i64::MAX),
477 gpu: Default::default(),
479 fpga: Default::default(),
480 disks: Default::default(),
481 })
482 }
483
484 fn guest_inputs_dir(&self) -> Option<&'static str> {
485 Some(GUEST_INPUTS_DIR)
486 }
487
488 fn needs_local_inputs(&self) -> bool {
489 true
490 }
491
492 fn spawn(
493 &self,
494 request: TaskSpawnRequest,
495 cancellation_token: CancellationToken,
496 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<TaskExecutionResult>>> {
497 let (completed_tx, completed_rx) = tokio::sync::oneshot::channel();
498
499 let requirements = request.requirements();
500 let hints = request.hints();
501
502 let container =
503 v1::container(requirements, self.engine_config.task.container.as_deref()).into_owned();
504
505 let mut required_cpu = v1::cpu(requirements);
506 let mut required_memory = ByteSize::b(v1::memory(requirements)? as u64);
507
508 if let Some(queue) = self.backend_config.lsf_queue_for_task(requirements, hints) {
511 if let Some(max_cpu) = queue.max_cpu_per_task()
512 && required_cpu > max_cpu as f64
513 {
514 let env_specific = if self.engine_config.suppress_env_specific_output {
515 String::new()
516 } else {
517 format!(", but the execution backend has a maximum of {max_cpu}",)
518 };
519 match self.engine_config.task.cpu_limit_behavior {
520 TaskResourceLimitBehavior::TryWithMax => {
521 warn!(
522 "task requires at least {required_cpu} CPU{s}{env_specific}",
523 s = if required_cpu == 1.0 { "" } else { "s" },
524 );
525 required_cpu = max_cpu as f64;
527 }
528 TaskResourceLimitBehavior::Deny => {
529 bail!(
530 "task requires at least {required_cpu} CPU{s}{env_specific}",
531 s = if required_cpu == 1.0 { "" } else { "s" },
532 );
533 }
534 }
535 }
536 if let Some(max_memory) = queue.max_memory_per_task()
537 && required_memory > max_memory
538 {
539 let env_specific = if self.engine_config.suppress_env_specific_output {
540 String::new()
541 } else {
542 format!(
543 ", but the execution backend has a maximum of {max_memory} GiB",
544 max_memory = max_memory.as_u64() as f64 / ONE_GIBIBYTE
545 )
546 };
547 match self.engine_config.task.memory_limit_behavior {
548 TaskResourceLimitBehavior::TryWithMax => {
549 warn!(
550 "task requires at least {required_memory} GiB of memory{env_specific}",
551 required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
552 );
553 required_memory = max_memory;
555 }
556 TaskResourceLimitBehavior::Deny => {
557 bail!(
558 "task requires at least {required_memory} GiB of memory{env_specific}",
559 required_memory = required_memory.as_u64() as f64 / ONE_GIBIBYTE
560 );
561 }
562 }
563 }
564 }
565
566 let _max_cpu = v1::max_cpu(hints);
571 let _max_memory = v1::max_memory(hints)?.map(|i| i as u64);
573
574 let request_id = request.id();
576 let name = if request_id.len() > LSF_JOB_NAME_MAX_LENGTH {
577 request_id
578 .chars()
579 .take(LSF_JOB_NAME_MAX_LENGTH)
580 .collect::<String>()
581 } else {
582 request_id.to_string()
583 };
584
585 self.manager.send(
586 LsfApptainerTaskRequest {
587 backend_config: self.backend_config.clone(),
588 apptainer_state: self.apptainer_state.clone(),
589 spawn_request: request,
590 name,
591 container,
592 required_cpu,
593 required_memory,
594 crankshaft_events: self.crankshaft_events.clone(),
595 cancellation_token,
596 },
597 completed_tx,
598 );
599
600 Ok(completed_rx)
601 }
602
603 fn cleanup<'a>(
604 &'a self,
605 _work_dir: &'a EvaluationPath,
606 _token: CancellationToken,
607 ) -> Option<futures::future::BoxFuture<'a, ()>> {
608 None
613 }
614}
615
616#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
625pub struct LsfQueueConfig {
626 name: String,
629 max_cpu_per_task: Option<u64>,
631 max_memory_per_task: Option<ByteSize>,
633}
634
635impl LsfQueueConfig {
636 pub fn new(
638 name: String,
639 max_cpu_per_task: Option<u64>,
640 max_memory_per_task: Option<ByteSize>,
641 ) -> Self {
642 Self {
643 name,
644 max_cpu_per_task,
645 max_memory_per_task,
646 }
647 }
648
649 pub fn name(&self) -> &str {
652 &self.name
653 }
654
655 pub fn max_cpu_per_task(&self) -> Option<u64> {
657 self.max_cpu_per_task
658 }
659
660 pub fn max_memory_per_task(&self) -> Option<ByteSize> {
662 self.max_memory_per_task
663 }
664
665 async fn validate(&self, name: &str) -> Result<(), anyhow::Error> {
667 let queue = self.name();
668 ensure!(!queue.is_empty(), "{name}_lsf_queue name cannot be empty");
669 if let Some(max_cpu_per_task) = self.max_cpu_per_task() {
670 ensure!(
671 max_cpu_per_task > 0,
672 "{name}_lsf_queue `{queue}` must allow at least 1 CPU to be provisioned"
673 );
674 }
675 if let Some(max_memory_per_task) = self.max_memory_per_task() {
676 ensure!(
677 max_memory_per_task.as_u64() > 0,
678 "{name}_lsf_queue `{queue}` must allow at least some memory to be provisioned"
679 );
680 }
681 match tokio::time::timeout(
682 std::time::Duration::from_secs(10),
685 Command::new("bqueues").arg(queue).output(),
686 )
687 .await
688 {
689 Ok(output) => {
690 let output = output.context("validating LSF queue")?;
691 if !output.status.success() {
692 let stdout = String::from_utf8_lossy(&output.stdout);
693 let stderr = String::from_utf8_lossy(&output.stderr);
694 error!(%stdout, %stderr, %queue, "failed to validate {name}_lsf_queue");
695 Err(anyhow!("failed to validate {name}_lsf_queue `{queue}`"))
696 } else {
697 Ok(())
698 }
699 }
700 Err(_) => Err(anyhow!(
701 "timed out trying to validate {name}_lsf_queue `{queue}`"
702 )),
703 }
704 }
705}
706
707#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
713pub struct LsfApptainerBackendConfig {
714 pub default_lsf_queue: Option<LsfQueueConfig>,
721 pub short_task_lsf_queue: Option<LsfQueueConfig>,
728 pub gpu_lsf_queue: Option<LsfQueueConfig>,
732 pub fpga_lsf_queue: Option<LsfQueueConfig>,
736 pub extra_bsub_args: Option<Vec<String>>,
739 #[serde(default = "default_max_scatter_concurrency")]
744 pub max_scatter_concurrency: u64,
745 #[serde(default)]
752 #[serde(flatten)]
756 pub apptainer_config: ApptainerConfig,
757}
758
759fn default_max_scatter_concurrency() -> u64 {
760 200
761}
762
763impl Default for LsfApptainerBackendConfig {
764 fn default() -> Self {
765 Self {
766 default_lsf_queue: None,
767 short_task_lsf_queue: None,
768 gpu_lsf_queue: None,
769 fpga_lsf_queue: None,
770 extra_bsub_args: None,
771 max_scatter_concurrency: default_max_scatter_concurrency(),
772 apptainer_config: ApptainerConfig::default(),
773 }
774 }
775}
776
777impl LsfApptainerBackendConfig {
778 pub async fn validate(&self, engine_config: &Config) -> Result<(), anyhow::Error> {
780 if cfg!(not(unix)) {
781 bail!("LSF + Apptainer backend is not supported on non-unix platforms");
782 }
783 if !engine_config.experimental_features_enabled {
784 bail!("LSF + Apptainer backend requires enabling experimental features");
785 }
786
787 if let Some(queue) = self.default_lsf_queue.as_ref() {
793 queue.validate("default").await?;
794 }
795 if let Some(queue) = self.short_task_lsf_queue.as_ref() {
796 queue.validate("short_task").await?;
797 }
798 if let Some(queue) = self.gpu_lsf_queue.as_ref() {
799 queue.validate("gpu").await?;
800 }
801 if let Some(queue) = self.fpga_lsf_queue.as_ref() {
802 queue.validate("fpga").await?;
803 }
804
805 self.apptainer_config.validate().await?;
806
807 Ok(())
808 }
809
810 fn lsf_queue_for_task(
815 &self,
816 requirements: &HashMap<String, Value>,
817 hints: &HashMap<String, Value>,
818 ) -> Option<&LsfQueueConfig> {
819 if let Some(queue) = self.fpga_lsf_queue.as_ref()
825 && let Some(true) = requirements
826 .get(wdl_ast::v1::TASK_REQUIREMENT_FPGA)
827 .and_then(Value::as_boolean)
828 {
829 return Some(queue);
830 }
831
832 if let Some(queue) = self.gpu_lsf_queue.as_ref()
833 && let Some(true) = requirements
834 .get(wdl_ast::v1::TASK_REQUIREMENT_GPU)
835 .and_then(Value::as_boolean)
836 {
837 return Some(queue);
838 }
839
840 if let Some(queue) = self.short_task_lsf_queue.as_ref()
842 && let Some(true) = hints
843 .get(wdl_ast::v1::TASK_HINT_SHORT_TASK)
844 .and_then(Value::as_boolean)
845 {
846 return Some(queue);
847 }
848
849 self.default_lsf_queue.as_ref()
852 }
853}