1use std::collections::HashMap;
4use std::collections::HashSet;
5use std::fs::File;
6use std::io::BufReader;
7use std::path::Path;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use indexmap::IndexMap;
13use serde::Serialize;
14use serde::ser::SerializeMap;
15use serde_json::Value as JsonValue;
16use serde_yaml_ng::Value as YamlValue;
17use wdl_analysis::Document;
18use wdl_analysis::document::Task;
19use wdl_analysis::document::Workflow;
20use wdl_analysis::types::CallKind;
21use wdl_analysis::types::Coercible as _;
22use wdl_analysis::types::PrimitiveType;
23use wdl_analysis::types::Type;
24use wdl_analysis::types::display_types;
25use wdl_analysis::types::v1::task_hint_types;
26use wdl_analysis::types::v1::task_requirement_types;
27
28use crate::Coercible;
29use crate::Value;
30
31pub type JsonMap = serde_json::Map<String, JsonValue>;
33
34fn join_paths<'a>(
37 inputs: &mut IndexMap<String, Value>,
38 path: impl Fn(&str) -> Result<&'a Path>,
39 ty: impl Fn(&str) -> Option<Type>,
40) -> Result<()> {
41 for (name, value) in inputs.iter_mut() {
42 let ty = match ty(name) {
43 Some(ty) => ty,
44 _ => {
45 continue;
46 }
47 };
48
49 let path = path(name)?;
50
51 let mut current = std::mem::replace(value, Value::None);
55 if let Ok(mut v) = current.coerce(&ty) {
56 drop(current);
57 v.visit_paths_mut(false, &mut |_, v| {
58 v.expand_path()?;
59 v.join_path_to(path);
60 v.ensure_path_exists(false)
61 })?;
62 current = v;
63 }
64
65 *value = current;
66 }
67
68 Ok(())
69}
70
71#[derive(Default, Debug, Clone)]
73pub struct TaskInputs {
74 inputs: IndexMap<String, Value>,
76 requirements: HashMap<String, Value>,
78 hints: HashMap<String, Value>,
80}
81
82impl TaskInputs {
83 pub fn iter(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
85 self.inputs.iter().map(|(k, v)| (k.as_str(), v))
86 }
87
88 pub fn get(&self, name: &str) -> Option<&Value> {
90 self.inputs.get(name)
91 }
92
93 pub fn set(&mut self, name: impl Into<String>, value: impl Into<Value>) -> Option<Value> {
97 self.inputs.insert(name.into(), value.into())
98 }
99
100 pub fn requirement(&self, name: &str) -> Option<&Value> {
102 self.requirements.get(name)
103 }
104
105 pub fn override_requirement(&mut self, name: impl Into<String>, value: impl Into<Value>) {
107 self.requirements.insert(name.into(), value.into());
108 }
109
110 pub fn hint(&self, name: &str) -> Option<&Value> {
112 self.hints.get(name)
113 }
114
115 pub fn override_hint(&mut self, name: impl Into<String>, value: impl Into<Value>) {
117 self.hints.insert(name.into(), value.into());
118 }
119
120 pub fn join_paths<'a>(
126 &mut self,
127 task: &Task,
128 path: impl Fn(&str) -> Result<&'a Path>,
129 ) -> Result<()> {
130 join_paths(&mut self.inputs, path, |name| {
131 task.inputs().get(name).map(|input| input.ty().clone())
132 })
133 }
134
135 pub fn validate(
140 &self,
141 document: &Document,
142 task: &Task,
143 specified: Option<&HashSet<String>>,
144 ) -> Result<()> {
145 let version = document.version().context("missing document version")?;
146
147 for (name, value) in &self.inputs {
149 let input = task
150 .inputs()
151 .get(name)
152 .with_context(|| format!("unknown input `{name}`"))?;
153 let ty = value.ty();
154 if !ty.is_coercible_to(input.ty()) {
155 bail!(
156 "expected type `{expected_ty}` for input `{name}`, but found `{ty}`",
157 expected_ty = input.ty(),
158 );
159 }
160 }
161
162 for (name, input) in task.inputs() {
164 if input.required()
165 && !self.inputs.contains_key(name)
166 && specified.map(|s| !s.contains(name)).unwrap_or(true)
167 {
168 bail!(
169 "missing required input `{name}` to task `{task}`",
170 task = task.name()
171 );
172 }
173 }
174
175 for (name, value) in &self.requirements {
177 let ty = value.ty();
178 if let Some(expected) = task_requirement_types(version, name.as_str()) {
179 if !expected.iter().any(|target| ty.is_coercible_to(target)) {
180 bail!(
181 "expected {expected} for requirement `{name}`, but found type `{ty}`",
182 expected = display_types(expected),
183 );
184 }
185
186 continue;
187 }
188
189 bail!("unsupported requirement `{name}`");
190 }
191
192 for (name, value) in &self.hints {
194 let ty = value.ty();
195 if let Some(expected) = task_hint_types(version, name.as_str(), false)
196 && !expected.iter().any(|target| ty.is_coercible_to(target))
197 {
198 bail!(
199 "expected {expected} for hint `{name}`, but found type `{ty}`",
200 expected = display_types(expected),
201 );
202 }
203 }
204
205 Ok(())
206 }
207
208 fn set_path_value(
215 &mut self,
216 document: &Document,
217 task: &Task,
218 path: &str,
219 value: Value,
220 ) -> Result<()> {
221 let version = document.version().expect("document should have a version");
222
223 match path.split_once('.') {
224 Some((key, remainder)) => {
226 let (must_match, matched) = match key {
227 "runtime" => (
228 false,
229 task_requirement_types(version, remainder)
230 .map(|types| (true, types))
231 .or_else(|| {
232 task_hint_types(version, remainder, false)
233 .map(|types| (false, types))
234 }),
235 ),
236 "requirements" => (
237 true,
238 task_requirement_types(version, remainder).map(|types| (true, types)),
239 ),
240 "hints" => (
241 false,
242 task_hint_types(version, remainder, false).map(|types| (false, types)),
243 ),
244 _ => {
245 bail!(
246 "task `{task}` does not have an input named `{path}`",
247 task = task.name()
248 );
249 }
250 };
251
252 if let Some((requirement, expected)) = matched {
253 for ty in expected {
254 if value.ty().is_coercible_to(ty) {
255 if requirement {
256 self.requirements.insert(remainder.to_string(), value);
257 } else {
258 self.hints.insert(remainder.to_string(), value);
259 }
260 return Ok(());
261 }
262 }
263
264 bail!(
265 "expected {expected} for {key} key `{remainder}`, but found type `{ty}`",
266 expected = display_types(expected),
267 ty = value.ty()
268 );
269 } else if must_match {
270 bail!("unsupported {key} key `{remainder}`");
271 } else {
272 Ok(())
273 }
274 }
275 None => {
277 let input = task.inputs().get(path).with_context(|| {
278 format!(
279 "task `{name}` does not have an input named `{path}`",
280 name = task.name()
281 )
282 })?;
283
284 let actual = value.ty();
285 let expected = input.ty();
286 if let Some(expected_prim_ty) = expected.as_primitive()
287 && expected_prim_ty == PrimitiveType::String
288 && let Some(actual_prim_ty) = actual.as_primitive()
289 && actual_prim_ty != PrimitiveType::String
290 {
291 self.inputs
292 .insert(path.to_string(), value.to_string().into());
293 return Ok(());
294 }
295 if !actual.is_coercible_to(expected) {
296 bail!(
297 "expected type `{expected}` for input `{path}`, but found type `{actual}`",
298 );
299 }
300 self.inputs.insert(path.to_string(), value);
301 Ok(())
302 }
303 }
304 }
305}
306
307impl<S, V> FromIterator<(S, V)> for TaskInputs
308where
309 S: Into<String>,
310 V: Into<Value>,
311{
312 fn from_iter<T: IntoIterator<Item = (S, V)>>(iter: T) -> Self {
313 Self {
314 inputs: iter
315 .into_iter()
316 .map(|(k, v)| (k.into(), v.into()))
317 .collect(),
318 requirements: Default::default(),
319 hints: Default::default(),
320 }
321 }
322}
323
324impl Serialize for TaskInputs {
325 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
326 where
327 S: serde::Serializer,
328 {
329 let mut map = serializer.serialize_map(Some(self.inputs.len()))?;
331 for (k, v) in &self.inputs {
332 let serialized_value = crate::ValueSerializer::new(v, true);
333 map.serialize_entry(k, &serialized_value)?;
334 }
335 map.end()
336 }
337}
338
339#[derive(Default, Debug, Clone)]
341pub struct WorkflowInputs {
342 inputs: IndexMap<String, Value>,
344 calls: HashMap<String, Inputs>,
346}
347
348impl WorkflowInputs {
349 pub fn iter(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
351 self.inputs.iter().map(|(k, v)| (k.as_str(), v))
352 }
353
354 pub fn get(&self, name: &str) -> Option<&Value> {
356 self.inputs.get(name)
357 }
358
359 pub fn calls(&self) -> &HashMap<String, Inputs> {
361 &self.calls
362 }
363
364 pub fn calls_mut(&mut self) -> &mut HashMap<String, Inputs> {
366 &mut self.calls
367 }
368
369 pub fn set(&mut self, name: impl Into<String>, value: impl Into<Value>) -> Option<Value> {
373 self.inputs.insert(name.into(), value.into())
374 }
375
376 pub fn contains(&self, name: &str) -> bool {
380 self.inputs.contains_key(name)
381 }
382
383 pub fn join_paths<'a>(
389 &mut self,
390 workflow: &Workflow,
391 path: impl Fn(&str) -> Result<&'a Path>,
392 ) -> Result<()> {
393 join_paths(&mut self.inputs, path, |name| {
394 workflow.inputs().get(name).map(|input| input.ty().clone())
395 })
396 }
397
398 pub fn validate(
403 &self,
404 document: &Document,
405 workflow: &Workflow,
406 specified: Option<&HashSet<String>>,
407 ) -> Result<()> {
408 for (name, value) in &self.inputs {
410 let input = workflow
411 .inputs()
412 .get(name)
413 .with_context(|| format!("unknown input `{name}`"))?;
414 let expected_ty = input.ty();
415 let ty = value.ty();
416 if !ty.is_coercible_to(expected_ty) {
417 bail!("expected type `{expected_ty}` for input `{name}`, but found type `{ty}`");
418 }
419 }
420
421 for (name, input) in workflow.inputs() {
423 if input.required()
424 && !self.inputs.contains_key(name)
425 && specified.map(|s| !s.contains(name)).unwrap_or(true)
426 {
427 bail!(
428 "missing required input `{name}` to workflow `{workflow}`",
429 workflow = workflow.name()
430 );
431 }
432 }
433
434 if !self.calls.is_empty() && !workflow.allows_nested_inputs() {
436 bail!(
437 "cannot specify a nested call input for workflow `{name}` as it does not allow \
438 nested inputs",
439 name = workflow.name()
440 );
441 }
442
443 for (name, inputs) in &self.calls {
445 let call = workflow.calls().get(name).with_context(|| {
446 format!(
447 "workflow `{workflow}` does not have a call named `{name}`",
448 workflow = workflow.name()
449 )
450 })?;
451
452 let document = call
455 .namespace()
456 .map(|ns| {
457 document
458 .namespace(ns)
459 .expect("namespace should be present")
460 .document()
461 })
462 .unwrap_or(document);
463
464 let inputs = match call.kind() {
466 CallKind::Task => {
467 let task = document
468 .task_by_name(call.name())
469 .expect("task should be present");
470
471 let task_inputs = inputs.as_task_inputs().with_context(|| {
472 format!("`{name}` is a call to a task, but workflow inputs were supplied")
473 })?;
474
475 task_inputs.validate(document, task, Some(call.specified()))?;
476 &task_inputs.inputs
477 }
478 CallKind::Workflow => {
479 let workflow = document.workflow().expect("should have a workflow");
480 assert_eq!(
481 workflow.name(),
482 call.name(),
483 "call name does not match workflow name"
484 );
485 let workflow_inputs = inputs.as_workflow_inputs().with_context(|| {
486 format!("`{name}` is a call to a workflow, but task inputs were supplied")
487 })?;
488
489 workflow_inputs.validate(document, workflow, Some(call.specified()))?;
490 &workflow_inputs.inputs
491 }
492 };
493
494 for input in inputs.keys() {
495 if call.specified().contains(input) {
496 bail!(
497 "cannot specify nested input `{input}` for call `{call}` as it was \
498 explicitly specified in the call itself",
499 call = call.name(),
500 );
501 }
502 }
503 }
504
505 if workflow.allows_nested_inputs() {
507 for (call, ty) in workflow.calls() {
508 let inputs = self.calls.get(call);
509
510 for (input, _) in ty
511 .inputs()
512 .iter()
513 .filter(|(n, i)| i.required() && !ty.specified().contains(*n))
514 {
515 if !inputs.map(|i| i.get(input).is_some()).unwrap_or(false) {
516 bail!("missing required input `{input}` for call `{call}`");
517 }
518 }
519 }
520 }
521
522 Ok(())
523 }
524
525 fn set_path_value(
532 &mut self,
533 document: &Document,
534 workflow: &Workflow,
535 path: &str,
536 value: Value,
537 ) -> Result<()> {
538 match path.split_once('.') {
539 Some((name, remainder)) => {
540 if !workflow.allows_nested_inputs() {
542 bail!(
543 "cannot specify a nested call input for workflow `{workflow}` as it does \
544 not allow nested inputs",
545 workflow = workflow.name()
546 );
547 }
548
549 let call = workflow.calls().get(name).with_context(|| {
551 format!(
552 "workflow `{workflow}` does not have a call named `{name}`",
553 workflow = workflow.name()
554 )
555 })?;
556
557 let inputs =
559 self.calls
560 .entry(name.to_string())
561 .or_insert_with(|| match call.kind() {
562 CallKind::Task => Inputs::Task(Default::default()),
563 CallKind::Workflow => Inputs::Workflow(Default::default()),
564 });
565
566 let document = call
569 .namespace()
570 .map(|ns| {
571 document
572 .namespace(ns)
573 .expect("namespace should be present")
574 .document()
575 })
576 .unwrap_or(document);
577
578 let next = remainder
579 .split_once('.')
580 .map(|(n, _)| n)
581 .unwrap_or(remainder);
582 if call.specified().contains(next) {
583 bail!(
584 "cannot specify nested input `{next}` for call `{name}` as it was \
585 explicitly specified in the call itself",
586 );
587 }
588
589 match call.kind() {
591 CallKind::Task => {
592 let task = document
593 .task_by_name(call.name())
594 .expect("task should be present");
595 inputs
596 .as_task_inputs_mut()
597 .expect("should be a task input")
598 .set_path_value(document, task, remainder, value)
599 }
600 CallKind::Workflow => {
601 let workflow = document.workflow().expect("should have a workflow");
602 assert_eq!(
603 workflow.name(),
604 call.name(),
605 "call name does not match workflow name"
606 );
607 inputs
608 .as_workflow_inputs_mut()
609 .expect("should be a task input")
610 .set_path_value(document, workflow, remainder, value)
611 }
612 }
613 }
614 None => {
615 let input = workflow.inputs().get(path).with_context(|| {
616 format!(
617 "workflow `{workflow}` does not have an input named `{path}`",
618 workflow = workflow.name()
619 )
620 })?;
621
622 let expected = input.ty();
623 let actual = value.ty();
624 if let Some(expected_prim_ty) = expected.as_primitive()
625 && expected_prim_ty == PrimitiveType::String
626 && let Some(actual_prim_ty) = actual.as_primitive()
627 && actual_prim_ty != PrimitiveType::String
628 {
629 self.inputs
630 .insert(path.to_string(), value.to_string().into());
631 return Ok(());
632 }
633 if !actual.is_coercible_to(expected) {
634 bail!(
635 "expected type `{expected}` for input `{path}`, but found type `{actual}`"
636 );
637 }
638 self.inputs.insert(path.to_string(), value);
639 Ok(())
640 }
641 }
642 }
643}
644
645impl<S, V> FromIterator<(S, V)> for WorkflowInputs
646where
647 S: Into<String>,
648 V: Into<Value>,
649{
650 fn from_iter<T: IntoIterator<Item = (S, V)>>(iter: T) -> Self {
651 Self {
652 inputs: iter
653 .into_iter()
654 .map(|(k, v)| (k.into(), v.into()))
655 .collect(),
656 calls: Default::default(),
657 }
658 }
659}
660
661impl Serialize for WorkflowInputs {
662 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
663 where
664 S: serde::Serializer,
665 {
666 let mut map = serializer.serialize_map(Some(self.inputs.len()))?;
669 for (k, v) in &self.inputs {
670 let serialized_value = crate::ValueSerializer::new(v, true);
671 map.serialize_entry(k, &serialized_value)?;
672 }
673 map.end()
674 }
675}
676
677#[derive(Debug, Clone)]
679pub enum Inputs {
680 Task(TaskInputs),
682 Workflow(WorkflowInputs),
684}
685
686impl Inputs {
687 pub fn parse(document: &Document, path: impl AsRef<Path>) -> Result<Option<(String, Self)>> {
701 let path = path.as_ref();
702
703 match path.extension().and_then(|ext| ext.to_str()) {
704 Some("json") => Self::parse_json(document, path),
705 Some("yml") | Some("yaml") => Self::parse_yaml(document, path),
706 ext => bail!(
707 "unsupported file extension: `{ext}`; the supported formats are JSON (`.json`) \
708 and YAML (`.yaml` and `.yml`)",
709 ext = ext.unwrap_or("")
710 ),
711 }
712 .with_context(|| format!("failed to parse input file `{path}`", path = path.display()))
713 }
714
715 pub fn parse_json(
724 document: &Document,
725 path: impl AsRef<Path>,
726 ) -> Result<Option<(String, Self)>> {
727 let path = path.as_ref();
728
729 let file = File::open(path).with_context(|| {
730 format!("failed to open input file `{path}`", path = path.display())
731 })?;
732
733 let reader = BufReader::new(file);
735
736 let map = std::mem::take(
737 serde_json::from_reader::<_, JsonValue>(reader)?
738 .as_object_mut()
739 .with_context(|| {
740 format!(
741 "expected input file `{path}` to contain a JSON object",
742 path = path.display()
743 )
744 })?,
745 );
746
747 Self::parse_object(document, map)
748 }
749
750 pub fn parse_yaml(
759 document: &Document,
760 path: impl AsRef<Path>,
761 ) -> Result<Option<(String, Self)>> {
762 let path = path.as_ref();
763
764 let file = File::open(path).with_context(|| {
765 format!("failed to open input file `{path}`", path = path.display())
766 })?;
767
768 let reader = BufReader::new(file);
770 let yaml = serde_yaml_ng::from_reader::<_, YamlValue>(reader)?;
771
772 let mut json = serde_json::to_value(yaml).with_context(|| {
774 format!(
775 "failed to convert YAML to JSON for processing `{path}`",
776 path = path.display()
777 )
778 })?;
779
780 let object = std::mem::take(json.as_object_mut().with_context(|| {
781 format!(
782 "expected input file `{path}` to contain a YAML mapping",
783 path = path.display()
784 )
785 })?);
786
787 Self::parse_object(document, object)
788 }
789
790 pub fn get(&self, name: &str) -> Option<&Value> {
792 match self {
793 Self::Task(t) => t.inputs.get(name),
794 Self::Workflow(w) => w.inputs.get(name),
795 }
796 }
797
798 pub fn set(&mut self, name: impl Into<String>, value: impl Into<Value>) -> Option<Value> {
802 match self {
803 Self::Task(inputs) => inputs.set(name, value),
804 Self::Workflow(inputs) => inputs.set(name, value),
805 }
806 }
807
808 pub fn as_task_inputs(&self) -> Option<&TaskInputs> {
812 match self {
813 Self::Task(inputs) => Some(inputs),
814 Self::Workflow(_) => None,
815 }
816 }
817
818 pub fn as_task_inputs_mut(&mut self) -> Option<&mut TaskInputs> {
822 match self {
823 Self::Task(inputs) => Some(inputs),
824 Self::Workflow(_) => None,
825 }
826 }
827
828 pub fn unwrap_task_inputs(self) -> TaskInputs {
834 match self {
835 Self::Task(inputs) => inputs,
836 Self::Workflow(_) => panic!("inputs are for a workflow"),
837 }
838 }
839
840 pub fn as_workflow_inputs(&self) -> Option<&WorkflowInputs> {
844 match self {
845 Self::Task(_) => None,
846 Self::Workflow(inputs) => Some(inputs),
847 }
848 }
849
850 pub fn as_workflow_inputs_mut(&mut self) -> Option<&mut WorkflowInputs> {
854 match self {
855 Self::Task(_) => None,
856 Self::Workflow(inputs) => Some(inputs),
857 }
858 }
859
860 pub fn unwrap_workflow_inputs(self) -> WorkflowInputs {
866 match self {
867 Self::Task(_) => panic!("inputs are for a task"),
868 Self::Workflow(inputs) => inputs,
869 }
870 }
871
872 pub fn parse_object(document: &Document, object: JsonMap) -> Result<Option<(String, Self)>> {
878 let (key, name) = match object.iter().next() {
880 Some((key, _)) => match key.split_once('.') {
881 Some((name, _remainder)) => (key, name),
882 None => {
883 bail!(
884 "invalid input key `{key}`: expected the value to be prefixed with the \
885 workflow or task name",
886 )
887 }
888 },
889 None => {
891 return Ok(None);
892 }
893 };
894
895 match (document.task_by_name(name), document.workflow()) {
896 (Some(task), _) => Ok(Some(Self::parse_task_inputs(document, task, object)?)),
897 (None, Some(workflow)) if workflow.name() == name => Ok(Some(
898 Self::parse_workflow_inputs(document, workflow, object)?,
899 )),
900 _ => bail!(
901 "invalid input key `{key}`: a task or workflow named `{name}` does not exist in \
902 the document"
903 ),
904 }
905 }
906
907 fn parse_task_inputs(
909 document: &Document,
910 task: &Task,
911 object: JsonMap,
912 ) -> Result<(String, Self)> {
913 let mut inputs = TaskInputs::default();
914 for (key, value) in object {
915 let value = serde_json::from_value(value)
917 .with_context(|| format!("invalid input key `{key}`"))?;
918
919 match key.split_once(".") {
920 Some((prefix, remainder)) if prefix == task.name() => {
921 inputs
922 .set_path_value(document, task, remainder, value)
923 .with_context(|| format!("invalid input key `{key}`"))?;
924 }
925 _ => {
926 bail!(
927 "invalid input key `{key}`: expected key to be prefixed with `{task}`",
928 task = task.name()
929 );
930 }
931 }
932 }
933
934 Ok((task.name().to_string(), Inputs::Task(inputs)))
935 }
936
937 fn parse_workflow_inputs(
939 document: &Document,
940 workflow: &Workflow,
941 object: JsonMap,
942 ) -> Result<(String, Self)> {
943 let mut inputs = WorkflowInputs::default();
944 for (key, value) in object {
945 let value = serde_json::from_value(value)
947 .with_context(|| format!("invalid input key `{key}`"))?;
948
949 match key.split_once(".") {
950 Some((prefix, remainder)) if prefix == workflow.name() => {
951 inputs
952 .set_path_value(document, workflow, remainder, value)
953 .with_context(|| format!("invalid input key `{key}`"))?;
954 }
955 _ => {
956 bail!(
957 "invalid input key `{key}`: expected key to be prefixed with `{workflow}`",
958 workflow = workflow.name()
959 );
960 }
961 }
962 }
963
964 Ok((workflow.name().to_string(), Inputs::Workflow(inputs)))
965 }
966}
967
968impl From<TaskInputs> for Inputs {
969 fn from(inputs: TaskInputs) -> Self {
970 Self::Task(inputs)
971 }
972}
973
974impl From<WorkflowInputs> for Inputs {
975 fn from(inputs: WorkflowInputs) -> Self {
976 Self::Workflow(inputs)
977 }
978}