1use std::collections::BTreeSet;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::fs::File;
7use std::io::BufReader;
8use std::path::Path;
9
10use anyhow::Context;
11use anyhow::Result;
12use anyhow::bail;
13use indexmap::IndexMap;
14use serde::Serialize;
15use serde::ser::SerializeMap;
16use serde_json::Value as JsonValue;
17use serde_yaml_ng::Value as YamlValue;
18use wdl_analysis::Document;
19use wdl_analysis::document::Input;
20use wdl_analysis::document::Task;
21use wdl_analysis::document::Workflow;
22use wdl_analysis::types::CallKind;
23use wdl_analysis::types::Coercible as _;
24use wdl_analysis::types::Optional;
25use wdl_analysis::types::PrimitiveType;
26use wdl_analysis::types::display_types;
27use wdl_analysis::types::v1::task_hint_types;
28use wdl_analysis::types::v1::task_requirement_types;
29use wdl_ast::SupportedVersion;
30use wdl_ast::version::V1;
31
32use crate::Coercible;
33use crate::EvaluationPath;
34use crate::Value;
35
36pub type JsonMap = serde_json::Map<String, JsonValue>;
38
39fn check_input_type(document: &Document, name: &str, input: &Input, value: &Value) -> Result<()> {
41 let expected_ty = if !input.required()
45 && document
46 .version()
47 .map(|v| v >= SupportedVersion::V1(V1::Two))
48 .unwrap_or(false)
49 {
50 input.ty().optional()
51 } else {
52 input.ty().clone()
53 };
54
55 let ty = value.ty();
56 if !ty.is_coercible_to(&expected_ty) {
57 bail!("expected type `{expected_ty}` for input `{name}`, but found `{ty}`");
58 }
59
60 Ok(())
61}
62
63#[derive(Default, Debug, Clone)]
65pub struct TaskInputs {
66 inputs: IndexMap<String, Value>,
68 requirements: HashMap<String, Value>,
70 hints: HashMap<String, Value>,
72}
73
74impl TaskInputs {
75 pub fn iter(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
77 self.inputs.iter().map(|(k, v)| (k.as_str(), v))
78 }
79
80 pub fn get(&self, name: &str) -> Option<&Value> {
82 self.inputs.get(name)
83 }
84
85 pub fn set(&mut self, name: impl Into<String>, value: impl Into<Value>) -> Option<Value> {
89 self.inputs.insert(name.into(), value.into())
90 }
91
92 pub fn requirement(&self, name: &str) -> Option<&Value> {
94 self.requirements.get(name)
95 }
96
97 pub fn override_requirement(&mut self, name: impl Into<String>, value: impl Into<Value>) {
99 self.requirements.insert(name.into(), value.into());
100 }
101
102 pub fn hint(&self, name: &str) -> Option<&Value> {
104 self.hints.get(name)
105 }
106
107 pub fn override_hint(&mut self, name: impl Into<String>, value: impl Into<Value>) {
109 self.hints.insert(name.into(), value.into());
110 }
111
112 pub async fn join_paths<'a>(
118 &mut self,
119 task: &Task,
120 path: impl Fn(&str) -> Result<&'a EvaluationPath>,
121 ) -> Result<()> {
122 for (name, value) in self.inputs.iter_mut() {
123 let Some(ty) = task.inputs().get(name).map(|input| input.ty().clone()) else {
124 bail!("could not find an expected type for input {name}");
125 };
126
127 let base_dir = path(name)?;
128
129 if let Ok(v) = value.coerce(None, &ty) {
130 *value = v
131 .resolve_paths(ty.is_optional(), None, None, &|path| path.expand(base_dir))
132 .await?;
133 }
134 }
135 Ok(())
136 }
137
138 pub fn validate(
143 &self,
144 document: &Document,
145 task: &Task,
146 specified: Option<&HashSet<String>>,
147 ) -> Result<()> {
148 let version = document.version().context("missing document version")?;
149
150 for (name, value) in &self.inputs {
152 let input = task
153 .inputs()
154 .get(name)
155 .with_context(|| format!("unknown input `{name}`"))?;
156
157 check_input_type(document, name, input, value)?;
158 }
159
160 for (name, input) in task.inputs() {
162 if input.required()
163 && !self.inputs.contains_key(name)
164 && specified.map(|s| !s.contains(name)).unwrap_or(true)
165 {
166 bail!(
167 "missing required input `{name}` to task `{task}`",
168 task = task.name()
169 );
170 }
171 }
172
173 for (name, value) in &self.requirements {
175 let ty = value.ty();
176 if let Some(expected) = task_requirement_types(version, name.as_str()) {
177 if !expected.iter().any(|target| ty.is_coercible_to(target)) {
178 bail!(
179 "expected {expected} for requirement `{name}`, but found type `{ty}`",
180 expected = display_types(expected),
181 );
182 }
183
184 continue;
185 }
186
187 bail!("unsupported requirement `{name}`");
188 }
189
190 for (name, value) in &self.hints {
192 let ty = value.ty();
193 if let Some(expected) = task_hint_types(version, name.as_str(), false)
194 && !expected.iter().any(|target| ty.is_coercible_to(target))
195 {
196 bail!(
197 "expected {expected} for hint `{name}`, but found type `{ty}`",
198 expected = display_types(expected),
199 );
200 }
201 }
202
203 Ok(())
204 }
205
206 fn set_path_value(
213 &mut self,
214 document: &Document,
215 task: &Task,
216 path: &str,
217 value: Value,
218 ) -> Result<()> {
219 let version = document.version().expect("document should have a version");
220
221 match path.split_once('.') {
222 Some((key, remainder)) => {
224 let (must_match, matched) = match key {
225 "runtime" => (
226 false,
227 task_requirement_types(version, remainder)
228 .map(|types| (true, types))
229 .or_else(|| {
230 task_hint_types(version, remainder, false)
231 .map(|types| (false, types))
232 }),
233 ),
234 "requirements" => (
235 true,
236 task_requirement_types(version, remainder).map(|types| (true, types)),
237 ),
238 "hints" => (
239 false,
240 task_hint_types(version, remainder, false).map(|types| (false, types)),
241 ),
242 _ => {
243 bail!(
244 "task `{task}` does not have an input named `{path}`",
245 task = task.name()
246 );
247 }
248 };
249
250 if let Some((requirement, expected)) = matched {
251 for ty in expected {
252 if value.ty().is_coercible_to(ty) {
253 if requirement {
254 self.requirements.insert(remainder.to_string(), value);
255 } else {
256 self.hints.insert(remainder.to_string(), value);
257 }
258 return Ok(());
259 }
260 }
261
262 bail!(
263 "expected {expected} for {key} key `{remainder}`, but found type `{ty}`",
264 expected = display_types(expected),
265 ty = value.ty()
266 );
267 } else if must_match {
268 bail!("unsupported {key} key `{remainder}`");
269 } else {
270 Ok(())
271 }
272 }
273 None => {
275 let input = task.inputs().get(path).with_context(|| {
276 format!(
277 "task `{name}` does not have an input named `{path}`",
278 name = task.name()
279 )
280 })?;
281
282 let actual = value.ty();
284 let expected = input.ty();
285 if let Some(PrimitiveType::String) = expected.as_primitive()
286 && let Some(actual) = actual.as_primitive()
287 && actual != PrimitiveType::String
288 {
289 self.inputs
290 .insert(path.to_string(), value.to_string().into());
291 return Ok(());
292 }
293
294 check_input_type(document, path, input, &value)?;
295 self.inputs.insert(path.to_string(), value);
296 Ok(())
297 }
298 }
299 }
300}
301
302impl<S, V> FromIterator<(S, V)> for TaskInputs
303where
304 S: Into<String>,
305 V: Into<Value>,
306{
307 fn from_iter<T: IntoIterator<Item = (S, V)>>(iter: T) -> Self {
308 Self {
309 inputs: iter
310 .into_iter()
311 .map(|(k, v)| (k.into(), v.into()))
312 .collect(),
313 requirements: Default::default(),
314 hints: Default::default(),
315 }
316 }
317}
318
319impl Serialize for TaskInputs {
320 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
321 where
322 S: serde::Serializer,
323 {
324 let mut map = serializer.serialize_map(Some(self.inputs.len()))?;
326 for (k, v) in &self.inputs {
327 let serialized_value = crate::ValueSerializer::new(None, v, true);
328 map.serialize_entry(k, &serialized_value)?;
329 }
330 map.end()
331 }
332}
333
334#[derive(Default, Debug, Clone)]
336pub struct WorkflowInputs {
337 inputs: IndexMap<String, Value>,
339 calls: HashMap<String, Inputs>,
341}
342
343impl WorkflowInputs {
344 pub fn iter(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
346 self.inputs.iter().map(|(k, v)| (k.as_str(), v))
347 }
348
349 pub fn get(&self, name: &str) -> Option<&Value> {
351 self.inputs.get(name)
352 }
353
354 pub fn calls(&self) -> &HashMap<String, Inputs> {
356 &self.calls
357 }
358
359 pub fn calls_mut(&mut self) -> &mut HashMap<String, Inputs> {
361 &mut self.calls
362 }
363
364 pub fn set(&mut self, name: impl Into<String>, value: impl Into<Value>) -> Option<Value> {
368 self.inputs.insert(name.into(), value.into())
369 }
370
371 pub fn contains(&self, name: &str) -> bool {
375 self.inputs.contains_key(name)
376 }
377
378 pub async fn join_paths<'a>(
384 &mut self,
385 workflow: &Workflow,
386 path: impl Fn(&str) -> Result<&'a EvaluationPath>,
387 ) -> Result<()> {
388 for (name, value) in self.inputs.iter_mut() {
389 let Some(ty) = workflow.inputs().get(name).map(|input| input.ty().clone()) else {
390 bail!("could not find an expected type for input {name}");
391 };
392
393 let base_dir = path(name)?;
394
395 if let Ok(v) = value.coerce(None, &ty) {
396 *value = v
397 .resolve_paths(ty.is_optional(), None, None, &|path| path.expand(base_dir))
398 .await?;
399 }
400 }
401 Ok(())
402 }
403
404 pub fn validate(
409 &self,
410 document: &Document,
411 workflow: &Workflow,
412 specified: Option<&HashSet<String>>,
413 ) -> Result<()> {
414 for (name, value) in &self.inputs {
416 let input = workflow
417 .inputs()
418 .get(name)
419 .with_context(|| format!("unknown input `{name}`"))?;
420 check_input_type(document, name, input, value)?;
421 }
422
423 for (name, input) in workflow.inputs() {
425 if input.required()
426 && !self.inputs.contains_key(name)
427 && specified.map(|s| !s.contains(name)).unwrap_or(true)
428 {
429 bail!(
430 "missing required input `{name}` to workflow `{workflow}`",
431 workflow = workflow.name()
432 );
433 }
434 }
435
436 if !self.calls.is_empty() && !workflow.allows_nested_inputs() {
438 bail!(
439 "cannot specify a nested call input for workflow `{name}` as it does not allow \
440 nested inputs",
441 name = workflow.name()
442 );
443 }
444
445 for (name, inputs) in &self.calls {
447 let call = workflow.calls().get(name).with_context(|| {
448 format!(
449 "workflow `{workflow}` does not have a call named `{name}`",
450 workflow = workflow.name()
451 )
452 })?;
453
454 let document = call
457 .namespace()
458 .map(|ns| {
459 document
460 .namespace(ns)
461 .expect("namespace should be present")
462 .document()
463 })
464 .unwrap_or(document);
465
466 let inputs = match call.kind() {
468 CallKind::Task => {
469 let task = document
470 .task_by_name(call.name())
471 .expect("task should be present");
472
473 let task_inputs = inputs.as_task_inputs().with_context(|| {
474 format!("`{name}` is a call to a task, but workflow inputs were supplied")
475 })?;
476
477 task_inputs.validate(document, task, Some(call.specified()))?;
478 &task_inputs.inputs
479 }
480 CallKind::Workflow => {
481 let workflow = document.workflow().expect("should have a workflow");
482 assert_eq!(
483 workflow.name(),
484 call.name(),
485 "call name does not match workflow name"
486 );
487 let workflow_inputs = inputs.as_workflow_inputs().with_context(|| {
488 format!("`{name}` is a call to a workflow, but task inputs were supplied")
489 })?;
490
491 workflow_inputs.validate(document, workflow, Some(call.specified()))?;
492 &workflow_inputs.inputs
493 }
494 };
495
496 for input in inputs.keys() {
497 if call.specified().contains(input) {
498 bail!(
499 "cannot specify nested input `{input}` for call `{call}` as it was \
500 explicitly specified in the call itself",
501 call = call.name(),
502 );
503 }
504 }
505 }
506
507 if workflow.allows_nested_inputs() {
509 for (call, ty) in workflow.calls() {
510 let inputs = self.calls.get(call);
511
512 for (input, _) in ty
513 .inputs()
514 .iter()
515 .filter(|(n, i)| i.required() && !ty.specified().contains(*n))
516 {
517 if !inputs.map(|i| i.get(input).is_some()).unwrap_or(false) {
518 bail!("missing required input `{input}` for call `{call}`");
519 }
520 }
521 }
522 }
523
524 Ok(())
525 }
526
527 fn set_path_value(
534 &mut self,
535 document: &Document,
536 workflow: &Workflow,
537 path: &str,
538 value: Value,
539 ) -> Result<()> {
540 match path.split_once('.') {
541 Some((name, remainder)) => {
542 if !workflow.allows_nested_inputs() {
544 bail!(
545 "cannot specify a nested call input for workflow `{workflow}` as it does \
546 not allow nested inputs",
547 workflow = workflow.name()
548 );
549 }
550
551 let call = workflow.calls().get(name).with_context(|| {
553 format!(
554 "workflow `{workflow}` does not have a call named `{name}`",
555 workflow = workflow.name()
556 )
557 })?;
558
559 let inputs =
561 self.calls
562 .entry(name.to_string())
563 .or_insert_with(|| match call.kind() {
564 CallKind::Task => Inputs::Task(Default::default()),
565 CallKind::Workflow => Inputs::Workflow(Default::default()),
566 });
567
568 let document = call
571 .namespace()
572 .map(|ns| {
573 document
574 .namespace(ns)
575 .expect("namespace should be present")
576 .document()
577 })
578 .unwrap_or(document);
579
580 let next = remainder
581 .split_once('.')
582 .map(|(n, _)| n)
583 .unwrap_or(remainder);
584 if call.specified().contains(next) {
585 bail!(
586 "cannot specify nested input `{next}` for call `{name}` as it was \
587 explicitly specified in the call itself",
588 );
589 }
590
591 match call.kind() {
593 CallKind::Task => {
594 let task = document
595 .task_by_name(call.name())
596 .expect("task should be present");
597 inputs
598 .as_task_inputs_mut()
599 .expect("should be a task input")
600 .set_path_value(document, task, remainder, value)
601 }
602 CallKind::Workflow => {
603 let workflow = document.workflow().expect("should have a workflow");
604 assert_eq!(
605 workflow.name(),
606 call.name(),
607 "call name does not match workflow name"
608 );
609 inputs
610 .as_workflow_inputs_mut()
611 .expect("should be a task input")
612 .set_path_value(document, workflow, remainder, value)
613 }
614 }
615 }
616 None => {
617 let input = workflow.inputs().get(path).with_context(|| {
618 format!(
619 "workflow `{workflow}` does not have an input named `{path}`",
620 workflow = workflow.name()
621 )
622 })?;
623
624 let actual = value.ty();
626 let expected = input.ty();
627 if let Some(PrimitiveType::String) = expected.as_primitive()
628 && let Some(actual) = actual.as_primitive()
629 && actual != PrimitiveType::String
630 {
631 self.inputs
632 .insert(path.to_string(), value.to_string().into());
633 return Ok(());
634 }
635
636 check_input_type(document, path, input, &value)?;
637 self.inputs.insert(path.to_string(), value);
638 Ok(())
639 }
640 }
641 }
642}
643
644impl<S, V> FromIterator<(S, V)> for WorkflowInputs
645where
646 S: Into<String>,
647 V: Into<Value>,
648{
649 fn from_iter<T: IntoIterator<Item = (S, V)>>(iter: T) -> Self {
650 Self {
651 inputs: iter
652 .into_iter()
653 .map(|(k, v)| (k.into(), v.into()))
654 .collect(),
655 calls: Default::default(),
656 }
657 }
658}
659
660impl Serialize for WorkflowInputs {
661 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
662 where
663 S: serde::Serializer,
664 {
665 let mut map = serializer.serialize_map(Some(self.inputs.len()))?;
668 for (k, v) in &self.inputs {
669 let serialized_value = crate::ValueSerializer::new(None, v, true);
670 map.serialize_entry(k, &serialized_value)?;
671 }
672 map.end()
673 }
674}
675
676#[derive(Debug, Clone)]
678pub enum Inputs {
679 Task(TaskInputs),
681 Workflow(WorkflowInputs),
683}
684
685impl Inputs {
686 pub fn parse(document: &Document, path: impl AsRef<Path>) -> Result<Option<(String, Self)>> {
700 let path = path.as_ref();
701
702 match path.extension().and_then(|ext| ext.to_str()) {
703 Some("json") => Self::parse_json(document, path),
704 Some("yml") | Some("yaml") => Self::parse_yaml(document, path),
705 ext => bail!(
706 "unsupported file extension: `{ext}`; the supported formats are JSON (`.json`) \
707 and YAML (`.yaml` and `.yml`)",
708 ext = ext.unwrap_or("")
709 ),
710 }
711 .with_context(|| format!("failed to parse input file `{path}`", path = path.display()))
712 }
713
714 pub fn parse_json(
723 document: &Document,
724 path: impl AsRef<Path>,
725 ) -> Result<Option<(String, Self)>> {
726 let path = path.as_ref();
727
728 let file = File::open(path).with_context(|| {
729 format!("failed to open input file `{path}`", path = path.display())
730 })?;
731
732 let reader = BufReader::new(file);
734
735 let map = std::mem::take(
736 serde_json::from_reader::<_, JsonValue>(reader)?
737 .as_object_mut()
738 .with_context(|| {
739 format!(
740 "expected input file `{path}` to contain a JSON object",
741 path = path.display()
742 )
743 })?,
744 );
745
746 Self::parse_json_object(document, map)
747 }
748
749 pub fn parse_yaml(
758 document: &Document,
759 path: impl AsRef<Path>,
760 ) -> Result<Option<(String, Self)>> {
761 let path = path.as_ref();
762
763 let file = File::open(path).with_context(|| {
764 format!("failed to open input file `{path}`", path = path.display())
765 })?;
766
767 let reader = BufReader::new(file);
769 let yaml = serde_yaml_ng::from_reader::<_, YamlValue>(reader)?;
770
771 let mut json = serde_json::to_value(yaml).with_context(|| {
773 format!(
774 "failed to convert YAML to JSON for processing `{path}`",
775 path = path.display()
776 )
777 })?;
778
779 let object = std::mem::take(json.as_object_mut().with_context(|| {
780 format!(
781 "expected input file `{path}` to contain a YAML mapping",
782 path = path.display()
783 )
784 })?);
785
786 Self::parse_json_object(document, object)
787 }
788
789 pub fn get(&self, name: &str) -> Option<&Value> {
791 match self {
792 Self::Task(t) => t.inputs.get(name),
793 Self::Workflow(w) => w.inputs.get(name),
794 }
795 }
796
797 pub fn set(&mut self, name: impl Into<String>, value: impl Into<Value>) -> Option<Value> {
801 match self {
802 Self::Task(inputs) => inputs.set(name, value),
803 Self::Workflow(inputs) => inputs.set(name, value),
804 }
805 }
806
807 pub fn as_task_inputs(&self) -> Option<&TaskInputs> {
811 match self {
812 Self::Task(inputs) => Some(inputs),
813 Self::Workflow(_) => None,
814 }
815 }
816
817 pub fn as_task_inputs_mut(&mut self) -> Option<&mut TaskInputs> {
821 match self {
822 Self::Task(inputs) => Some(inputs),
823 Self::Workflow(_) => None,
824 }
825 }
826
827 pub fn unwrap_task_inputs(self) -> TaskInputs {
833 match self {
834 Self::Task(inputs) => inputs,
835 Self::Workflow(_) => panic!("inputs are for a workflow"),
836 }
837 }
838
839 pub fn as_workflow_inputs(&self) -> Option<&WorkflowInputs> {
843 match self {
844 Self::Task(_) => None,
845 Self::Workflow(inputs) => Some(inputs),
846 }
847 }
848
849 pub fn as_workflow_inputs_mut(&mut self) -> Option<&mut WorkflowInputs> {
853 match self {
854 Self::Task(_) => None,
855 Self::Workflow(inputs) => Some(inputs),
856 }
857 }
858
859 pub fn unwrap_workflow_inputs(self) -> WorkflowInputs {
865 match self {
866 Self::Task(_) => panic!("inputs are for a task"),
867 Self::Workflow(inputs) => inputs,
868 }
869 }
870
871 pub fn parse_json_object(
877 document: &Document,
878 object: JsonMap,
879 ) -> Result<Option<(String, Self)>> {
880 if object.is_empty() {
882 return Ok(None);
883 }
884
885 let mut entrypoint_candidates = BTreeSet::new();
888 for key in object.keys() {
889 let Some((prefix, _)) = key.split_once('.') else {
890 bail!(
891 "invalid input key `{key}`: expected the key to be prefixed with the workflow \
892 or task name",
893 )
894 };
895 entrypoint_candidates.insert(prefix);
896 }
897
898 let entrypoint_name = match entrypoint_candidates
901 .iter()
902 .take(2)
903 .collect::<Vec<_>>()
904 .as_slice()
905 {
906 [] => panic!("no entrypoint candidates for inputs; report this as a bug"),
907 [entrypoint_name] => entrypoint_name.to_string(),
908 _ => bail!(
909 "invalid inputs: expected each input key to be prefixed with the same workflow or \
910 task name, but found multiple prefixes: {entrypoint_candidates:?}",
911 ),
912 };
913
914 let inputs = match (document.task_by_name(&entrypoint_name), document.workflow()) {
915 (Some(task), _) => Self::parse_task_inputs(document, task, object)?,
916 (None, Some(workflow)) if workflow.name() == entrypoint_name => {
917 Self::parse_workflow_inputs(document, workflow, object)?
918 }
919 _ => bail!(
920 "invalid inputs: a task or workflow named `{entrypoint_name}` does not exist in \
921 the document"
922 ),
923 };
924 Ok(Some((entrypoint_name, inputs)))
925 }
926
927 fn parse_task_inputs(document: &Document, task: &Task, object: JsonMap) -> Result<Self> {
929 let mut inputs = TaskInputs::default();
930 for (key, value) in object {
931 let value = serde_json::from_value(value)
933 .with_context(|| format!("invalid input value for key `{key}`"))?;
934
935 match key.split_once(".") {
936 Some((prefix, remainder)) if prefix == task.name() => {
937 inputs
938 .set_path_value(document, task, remainder, value)
939 .with_context(|| format!("invalid input key `{key}`"))?;
940 }
941 _ => {
942 bail!(
946 "invalid input key `{key}`: expected key to be prefixed with `{task}`",
947 task = task.name()
948 );
949 }
950 }
951 }
952
953 Ok(Inputs::Task(inputs))
954 }
955
956 fn parse_workflow_inputs(
958 document: &Document,
959 workflow: &Workflow,
960 object: JsonMap,
961 ) -> Result<Self> {
962 let mut inputs = WorkflowInputs::default();
963 for (key, value) in object {
964 let value = serde_json::from_value(value)
966 .with_context(|| format!("invalid input value for key `{key}`"))?;
967
968 match key.split_once(".") {
969 Some((prefix, remainder)) if prefix == workflow.name() => {
970 inputs
971 .set_path_value(document, workflow, remainder, value)
972 .with_context(|| format!("invalid input key `{key}`"))?;
973 }
974 _ => {
975 bail!(
979 "invalid input key `{key}`: expected key to be prefixed with `{workflow}`",
980 workflow = workflow.name()
981 );
982 }
983 }
984 }
985
986 Ok(Inputs::Workflow(inputs))
987 }
988}
989
990impl From<TaskInputs> for Inputs {
991 fn from(inputs: TaskInputs) -> Self {
992 Self::Task(inputs)
993 }
994}
995
996impl From<WorkflowInputs> for Inputs {
997 fn from(inputs: WorkflowInputs) -> Self {
998 Self::Workflow(inputs)
999 }
1000}