1use std::collections::HashMap;
4use std::collections::HashSet;
5use std::fs::File;
6use std::io::BufReader;
7use std::path::Path;
8
9use anyhow::Context;
10use anyhow::Result;
11use anyhow::bail;
12use indexmap::IndexMap;
13use serde::Serialize;
14use serde::ser::SerializeMap;
15use serde_json::Value as JsonValue;
16use serde_yaml_ng::Value as YamlValue;
17use wdl_analysis::Document;
18use wdl_analysis::document::Input;
19use wdl_analysis::document::Task;
20use wdl_analysis::document::Workflow;
21use wdl_analysis::types::CallKind;
22use wdl_analysis::types::Coercible as _;
23use wdl_analysis::types::Optional;
24use wdl_analysis::types::PrimitiveType;
25use wdl_analysis::types::Type;
26use wdl_analysis::types::display_types;
27use wdl_analysis::types::v1::task_hint_types;
28use wdl_analysis::types::v1::task_requirement_types;
29use wdl_ast::SupportedVersion;
30use wdl_ast::version::V1;
31
32use crate::Coercible;
33use crate::Value;
34use crate::path::EvaluationPath;
35
36pub type JsonMap = serde_json::Map<String, JsonValue>;
38
39fn check_input_type(document: &Document, name: &str, input: &Input, value: &Value) -> Result<()> {
41 let expected_ty = if !input.required()
45 && document
46 .version()
47 .map(|v| v >= SupportedVersion::V1(V1::Two))
48 .unwrap_or(false)
49 {
50 input.ty().optional()
51 } else {
52 input.ty().clone()
53 };
54
55 let ty = value.ty();
56 if !ty.is_coercible_to(&expected_ty) {
57 bail!("expected type `{expected_ty}` for input `{name}`, but found `{ty}`");
58 }
59
60 Ok(())
61}
62
63async fn join_paths<'a>(
66 inputs: &mut IndexMap<String, Value>,
67 base_dir: impl Fn(&str) -> Result<&'a EvaluationPath>,
68 ty: impl Fn(&str) -> Option<Type>,
69) -> Result<()> {
70 for (name, value) in inputs.iter_mut() {
71 let ty = match ty(name) {
72 Some(ty) => ty,
73 _ => {
74 continue;
75 }
76 };
77
78 let base_dir = base_dir(name)?;
79
80 let mut current = std::mem::replace(value, Value::None(value.ty()));
84 if let Ok(mut v) = current.coerce(None, &ty) {
85 drop(current);
86 v.ensure_paths_exist(ty.is_optional(), None, None, &|path| path.expand(base_dir))
87 .await?;
88 current = v;
89 }
90
91 *value = current;
92 }
93
94 Ok(())
95}
96
97#[derive(Default, Debug, Clone)]
99pub struct TaskInputs {
100 inputs: IndexMap<String, Value>,
102 requirements: HashMap<String, Value>,
104 hints: HashMap<String, Value>,
106}
107
108impl TaskInputs {
109 pub fn iter(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
111 self.inputs.iter().map(|(k, v)| (k.as_str(), v))
112 }
113
114 pub fn get(&self, name: &str) -> Option<&Value> {
116 self.inputs.get(name)
117 }
118
119 pub fn set(&mut self, name: impl Into<String>, value: impl Into<Value>) -> Option<Value> {
123 self.inputs.insert(name.into(), value.into())
124 }
125
126 pub fn requirement(&self, name: &str) -> Option<&Value> {
128 self.requirements.get(name)
129 }
130
131 pub fn override_requirement(&mut self, name: impl Into<String>, value: impl Into<Value>) {
133 self.requirements.insert(name.into(), value.into());
134 }
135
136 pub fn hint(&self, name: &str) -> Option<&Value> {
138 self.hints.get(name)
139 }
140
141 pub fn override_hint(&mut self, name: impl Into<String>, value: impl Into<Value>) {
143 self.hints.insert(name.into(), value.into());
144 }
145
146 pub async fn join_paths<'a>(
152 &mut self,
153 task: &Task,
154 path: impl Fn(&str) -> Result<&'a EvaluationPath>,
155 ) -> Result<()> {
156 join_paths(&mut self.inputs, path, |name| {
157 task.inputs().get(name).map(|input| input.ty().clone())
158 })
159 .await
160 }
161
162 pub fn validate(
167 &self,
168 document: &Document,
169 task: &Task,
170 specified: Option<&HashSet<String>>,
171 ) -> Result<()> {
172 let version = document.version().context("missing document version")?;
173
174 for (name, value) in &self.inputs {
176 let input = task
177 .inputs()
178 .get(name)
179 .with_context(|| format!("unknown input `{name}`"))?;
180
181 check_input_type(document, name, input, value)?;
182 }
183
184 for (name, input) in task.inputs() {
186 if input.required()
187 && !self.inputs.contains_key(name)
188 && specified.map(|s| !s.contains(name)).unwrap_or(true)
189 {
190 bail!(
191 "missing required input `{name}` to task `{task}`",
192 task = task.name()
193 );
194 }
195 }
196
197 for (name, value) in &self.requirements {
199 let ty = value.ty();
200 if let Some(expected) = task_requirement_types(version, name.as_str()) {
201 if !expected.iter().any(|target| ty.is_coercible_to(target)) {
202 bail!(
203 "expected {expected} for requirement `{name}`, but found type `{ty}`",
204 expected = display_types(expected),
205 );
206 }
207
208 continue;
209 }
210
211 bail!("unsupported requirement `{name}`");
212 }
213
214 for (name, value) in &self.hints {
216 let ty = value.ty();
217 if let Some(expected) = task_hint_types(version, name.as_str(), false)
218 && !expected.iter().any(|target| ty.is_coercible_to(target))
219 {
220 bail!(
221 "expected {expected} for hint `{name}`, but found type `{ty}`",
222 expected = display_types(expected),
223 );
224 }
225 }
226
227 Ok(())
228 }
229
230 fn set_path_value(
237 &mut self,
238 document: &Document,
239 task: &Task,
240 path: &str,
241 value: Value,
242 ) -> Result<()> {
243 let version = document.version().expect("document should have a version");
244
245 match path.split_once('.') {
246 Some((key, remainder)) => {
248 let (must_match, matched) = match key {
249 "runtime" => (
250 false,
251 task_requirement_types(version, remainder)
252 .map(|types| (true, types))
253 .or_else(|| {
254 task_hint_types(version, remainder, false)
255 .map(|types| (false, types))
256 }),
257 ),
258 "requirements" => (
259 true,
260 task_requirement_types(version, remainder).map(|types| (true, types)),
261 ),
262 "hints" => (
263 false,
264 task_hint_types(version, remainder, false).map(|types| (false, types)),
265 ),
266 _ => {
267 bail!(
268 "task `{task}` does not have an input named `{path}`",
269 task = task.name()
270 );
271 }
272 };
273
274 if let Some((requirement, expected)) = matched {
275 for ty in expected {
276 if value.ty().is_coercible_to(ty) {
277 if requirement {
278 self.requirements.insert(remainder.to_string(), value);
279 } else {
280 self.hints.insert(remainder.to_string(), value);
281 }
282 return Ok(());
283 }
284 }
285
286 bail!(
287 "expected {expected} for {key} key `{remainder}`, but found type `{ty}`",
288 expected = display_types(expected),
289 ty = value.ty()
290 );
291 } else if must_match {
292 bail!("unsupported {key} key `{remainder}`");
293 } else {
294 Ok(())
295 }
296 }
297 None => {
299 let input = task.inputs().get(path).with_context(|| {
300 format!(
301 "task `{name}` does not have an input named `{path}`",
302 name = task.name()
303 )
304 })?;
305
306 let actual = value.ty();
308 let expected = input.ty();
309 if let Some(PrimitiveType::String) = expected.as_primitive()
310 && let Some(actual) = actual.as_primitive()
311 && actual != PrimitiveType::String
312 {
313 self.inputs
314 .insert(path.to_string(), value.to_string().into());
315 return Ok(());
316 }
317
318 check_input_type(document, path, input, &value)?;
319 self.inputs.insert(path.to_string(), value);
320 Ok(())
321 }
322 }
323 }
324}
325
326impl<S, V> FromIterator<(S, V)> for TaskInputs
327where
328 S: Into<String>,
329 V: Into<Value>,
330{
331 fn from_iter<T: IntoIterator<Item = (S, V)>>(iter: T) -> Self {
332 Self {
333 inputs: iter
334 .into_iter()
335 .map(|(k, v)| (k.into(), v.into()))
336 .collect(),
337 requirements: Default::default(),
338 hints: Default::default(),
339 }
340 }
341}
342
343impl Serialize for TaskInputs {
344 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
345 where
346 S: serde::Serializer,
347 {
348 let mut map = serializer.serialize_map(Some(self.inputs.len()))?;
350 for (k, v) in &self.inputs {
351 let serialized_value = crate::ValueSerializer::new(v, true);
352 map.serialize_entry(k, &serialized_value)?;
353 }
354 map.end()
355 }
356}
357
358#[derive(Default, Debug, Clone)]
360pub struct WorkflowInputs {
361 inputs: IndexMap<String, Value>,
363 calls: HashMap<String, Inputs>,
365}
366
367impl WorkflowInputs {
368 pub fn iter(&self) -> impl Iterator<Item = (&str, &Value)> + use<'_> {
370 self.inputs.iter().map(|(k, v)| (k.as_str(), v))
371 }
372
373 pub fn get(&self, name: &str) -> Option<&Value> {
375 self.inputs.get(name)
376 }
377
378 pub fn calls(&self) -> &HashMap<String, Inputs> {
380 &self.calls
381 }
382
383 pub fn calls_mut(&mut self) -> &mut HashMap<String, Inputs> {
385 &mut self.calls
386 }
387
388 pub fn set(&mut self, name: impl Into<String>, value: impl Into<Value>) -> Option<Value> {
392 self.inputs.insert(name.into(), value.into())
393 }
394
395 pub fn contains(&self, name: &str) -> bool {
399 self.inputs.contains_key(name)
400 }
401
402 pub async fn join_paths<'a>(
408 &mut self,
409 workflow: &Workflow,
410 path: impl Fn(&str) -> Result<&'a EvaluationPath>,
411 ) -> Result<()> {
412 join_paths(&mut self.inputs, path, |name| {
413 workflow.inputs().get(name).map(|input| input.ty().clone())
414 })
415 .await
416 }
417
418 pub fn validate(
423 &self,
424 document: &Document,
425 workflow: &Workflow,
426 specified: Option<&HashSet<String>>,
427 ) -> Result<()> {
428 for (name, value) in &self.inputs {
430 let input = workflow
431 .inputs()
432 .get(name)
433 .with_context(|| format!("unknown input `{name}`"))?;
434 check_input_type(document, name, input, value)?;
435 }
436
437 for (name, input) in workflow.inputs() {
439 if input.required()
440 && !self.inputs.contains_key(name)
441 && specified.map(|s| !s.contains(name)).unwrap_or(true)
442 {
443 bail!(
444 "missing required input `{name}` to workflow `{workflow}`",
445 workflow = workflow.name()
446 );
447 }
448 }
449
450 if !self.calls.is_empty() && !workflow.allows_nested_inputs() {
452 bail!(
453 "cannot specify a nested call input for workflow `{name}` as it does not allow \
454 nested inputs",
455 name = workflow.name()
456 );
457 }
458
459 for (name, inputs) in &self.calls {
461 let call = workflow.calls().get(name).with_context(|| {
462 format!(
463 "workflow `{workflow}` does not have a call named `{name}`",
464 workflow = workflow.name()
465 )
466 })?;
467
468 let document = call
471 .namespace()
472 .map(|ns| {
473 document
474 .namespace(ns)
475 .expect("namespace should be present")
476 .document()
477 })
478 .unwrap_or(document);
479
480 let inputs = match call.kind() {
482 CallKind::Task => {
483 let task = document
484 .task_by_name(call.name())
485 .expect("task should be present");
486
487 let task_inputs = inputs.as_task_inputs().with_context(|| {
488 format!("`{name}` is a call to a task, but workflow inputs were supplied")
489 })?;
490
491 task_inputs.validate(document, task, Some(call.specified()))?;
492 &task_inputs.inputs
493 }
494 CallKind::Workflow => {
495 let workflow = document.workflow().expect("should have a workflow");
496 assert_eq!(
497 workflow.name(),
498 call.name(),
499 "call name does not match workflow name"
500 );
501 let workflow_inputs = inputs.as_workflow_inputs().with_context(|| {
502 format!("`{name}` is a call to a workflow, but task inputs were supplied")
503 })?;
504
505 workflow_inputs.validate(document, workflow, Some(call.specified()))?;
506 &workflow_inputs.inputs
507 }
508 };
509
510 for input in inputs.keys() {
511 if call.specified().contains(input) {
512 bail!(
513 "cannot specify nested input `{input}` for call `{call}` as it was \
514 explicitly specified in the call itself",
515 call = call.name(),
516 );
517 }
518 }
519 }
520
521 if workflow.allows_nested_inputs() {
523 for (call, ty) in workflow.calls() {
524 let inputs = self.calls.get(call);
525
526 for (input, _) in ty
527 .inputs()
528 .iter()
529 .filter(|(n, i)| i.required() && !ty.specified().contains(*n))
530 {
531 if !inputs.map(|i| i.get(input).is_some()).unwrap_or(false) {
532 bail!("missing required input `{input}` for call `{call}`");
533 }
534 }
535 }
536 }
537
538 Ok(())
539 }
540
541 fn set_path_value(
548 &mut self,
549 document: &Document,
550 workflow: &Workflow,
551 path: &str,
552 value: Value,
553 ) -> Result<()> {
554 match path.split_once('.') {
555 Some((name, remainder)) => {
556 if !workflow.allows_nested_inputs() {
558 bail!(
559 "cannot specify a nested call input for workflow `{workflow}` as it does \
560 not allow nested inputs",
561 workflow = workflow.name()
562 );
563 }
564
565 let call = workflow.calls().get(name).with_context(|| {
567 format!(
568 "workflow `{workflow}` does not have a call named `{name}`",
569 workflow = workflow.name()
570 )
571 })?;
572
573 let inputs =
575 self.calls
576 .entry(name.to_string())
577 .or_insert_with(|| match call.kind() {
578 CallKind::Task => Inputs::Task(Default::default()),
579 CallKind::Workflow => Inputs::Workflow(Default::default()),
580 });
581
582 let document = call
585 .namespace()
586 .map(|ns| {
587 document
588 .namespace(ns)
589 .expect("namespace should be present")
590 .document()
591 })
592 .unwrap_or(document);
593
594 let next = remainder
595 .split_once('.')
596 .map(|(n, _)| n)
597 .unwrap_or(remainder);
598 if call.specified().contains(next) {
599 bail!(
600 "cannot specify nested input `{next}` for call `{name}` as it was \
601 explicitly specified in the call itself",
602 );
603 }
604
605 match call.kind() {
607 CallKind::Task => {
608 let task = document
609 .task_by_name(call.name())
610 .expect("task should be present");
611 inputs
612 .as_task_inputs_mut()
613 .expect("should be a task input")
614 .set_path_value(document, task, remainder, value)
615 }
616 CallKind::Workflow => {
617 let workflow = document.workflow().expect("should have a workflow");
618 assert_eq!(
619 workflow.name(),
620 call.name(),
621 "call name does not match workflow name"
622 );
623 inputs
624 .as_workflow_inputs_mut()
625 .expect("should be a task input")
626 .set_path_value(document, workflow, remainder, value)
627 }
628 }
629 }
630 None => {
631 let input = workflow.inputs().get(path).with_context(|| {
632 format!(
633 "workflow `{workflow}` does not have an input named `{path}`",
634 workflow = workflow.name()
635 )
636 })?;
637
638 let actual = value.ty();
640 let expected = input.ty();
641 if let Some(PrimitiveType::String) = expected.as_primitive()
642 && let Some(actual) = actual.as_primitive()
643 && actual != PrimitiveType::String
644 {
645 self.inputs
646 .insert(path.to_string(), value.to_string().into());
647 return Ok(());
648 }
649
650 check_input_type(document, path, input, &value)?;
651 self.inputs.insert(path.to_string(), value);
652 Ok(())
653 }
654 }
655 }
656}
657
658impl<S, V> FromIterator<(S, V)> for WorkflowInputs
659where
660 S: Into<String>,
661 V: Into<Value>,
662{
663 fn from_iter<T: IntoIterator<Item = (S, V)>>(iter: T) -> Self {
664 Self {
665 inputs: iter
666 .into_iter()
667 .map(|(k, v)| (k.into(), v.into()))
668 .collect(),
669 calls: Default::default(),
670 }
671 }
672}
673
674impl Serialize for WorkflowInputs {
675 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
676 where
677 S: serde::Serializer,
678 {
679 let mut map = serializer.serialize_map(Some(self.inputs.len()))?;
682 for (k, v) in &self.inputs {
683 let serialized_value = crate::ValueSerializer::new(v, true);
684 map.serialize_entry(k, &serialized_value)?;
685 }
686 map.end()
687 }
688}
689
690#[derive(Debug, Clone)]
692pub enum Inputs {
693 Task(TaskInputs),
695 Workflow(WorkflowInputs),
697}
698
699impl Inputs {
700 pub fn parse(document: &Document, path: impl AsRef<Path>) -> Result<Option<(String, Self)>> {
714 let path = path.as_ref();
715
716 match path.extension().and_then(|ext| ext.to_str()) {
717 Some("json") => Self::parse_json(document, path),
718 Some("yml") | Some("yaml") => Self::parse_yaml(document, path),
719 ext => bail!(
720 "unsupported file extension: `{ext}`; the supported formats are JSON (`.json`) \
721 and YAML (`.yaml` and `.yml`)",
722 ext = ext.unwrap_or("")
723 ),
724 }
725 .with_context(|| format!("failed to parse input file `{path}`", path = path.display()))
726 }
727
728 pub fn parse_json(
737 document: &Document,
738 path: impl AsRef<Path>,
739 ) -> Result<Option<(String, Self)>> {
740 let path = path.as_ref();
741
742 let file = File::open(path).with_context(|| {
743 format!("failed to open input file `{path}`", path = path.display())
744 })?;
745
746 let reader = BufReader::new(file);
748
749 let map = std::mem::take(
750 serde_json::from_reader::<_, JsonValue>(reader)?
751 .as_object_mut()
752 .with_context(|| {
753 format!(
754 "expected input file `{path}` to contain a JSON object",
755 path = path.display()
756 )
757 })?,
758 );
759
760 Self::parse_object(document, map)
761 }
762
763 pub fn parse_yaml(
772 document: &Document,
773 path: impl AsRef<Path>,
774 ) -> Result<Option<(String, Self)>> {
775 let path = path.as_ref();
776
777 let file = File::open(path).with_context(|| {
778 format!("failed to open input file `{path}`", path = path.display())
779 })?;
780
781 let reader = BufReader::new(file);
783 let yaml = serde_yaml_ng::from_reader::<_, YamlValue>(reader)?;
784
785 let mut json = serde_json::to_value(yaml).with_context(|| {
787 format!(
788 "failed to convert YAML to JSON for processing `{path}`",
789 path = path.display()
790 )
791 })?;
792
793 let object = std::mem::take(json.as_object_mut().with_context(|| {
794 format!(
795 "expected input file `{path}` to contain a YAML mapping",
796 path = path.display()
797 )
798 })?);
799
800 Self::parse_object(document, object)
801 }
802
803 pub fn get(&self, name: &str) -> Option<&Value> {
805 match self {
806 Self::Task(t) => t.inputs.get(name),
807 Self::Workflow(w) => w.inputs.get(name),
808 }
809 }
810
811 pub fn set(&mut self, name: impl Into<String>, value: impl Into<Value>) -> Option<Value> {
815 match self {
816 Self::Task(inputs) => inputs.set(name, value),
817 Self::Workflow(inputs) => inputs.set(name, value),
818 }
819 }
820
821 pub fn as_task_inputs(&self) -> Option<&TaskInputs> {
825 match self {
826 Self::Task(inputs) => Some(inputs),
827 Self::Workflow(_) => None,
828 }
829 }
830
831 pub fn as_task_inputs_mut(&mut self) -> Option<&mut TaskInputs> {
835 match self {
836 Self::Task(inputs) => Some(inputs),
837 Self::Workflow(_) => None,
838 }
839 }
840
841 pub fn unwrap_task_inputs(self) -> TaskInputs {
847 match self {
848 Self::Task(inputs) => inputs,
849 Self::Workflow(_) => panic!("inputs are for a workflow"),
850 }
851 }
852
853 pub fn as_workflow_inputs(&self) -> Option<&WorkflowInputs> {
857 match self {
858 Self::Task(_) => None,
859 Self::Workflow(inputs) => Some(inputs),
860 }
861 }
862
863 pub fn as_workflow_inputs_mut(&mut self) -> Option<&mut WorkflowInputs> {
867 match self {
868 Self::Task(_) => None,
869 Self::Workflow(inputs) => Some(inputs),
870 }
871 }
872
873 pub fn unwrap_workflow_inputs(self) -> WorkflowInputs {
879 match self {
880 Self::Task(_) => panic!("inputs are for a task"),
881 Self::Workflow(inputs) => inputs,
882 }
883 }
884
885 pub fn parse_object(document: &Document, object: JsonMap) -> Result<Option<(String, Self)>> {
891 let (key, name) = match object.iter().next() {
893 Some((key, _)) => match key.split_once('.') {
894 Some((name, _remainder)) => (key, name),
895 None => {
896 bail!(
897 "invalid input key `{key}`: expected the value to be prefixed with the \
898 workflow or task name",
899 )
900 }
901 },
902 None => {
904 return Ok(None);
905 }
906 };
907
908 match (document.task_by_name(name), document.workflow()) {
909 (Some(task), _) => Ok(Some(Self::parse_task_inputs(document, task, object)?)),
910 (None, Some(workflow)) if workflow.name() == name => Ok(Some(
911 Self::parse_workflow_inputs(document, workflow, object)?,
912 )),
913 _ => bail!(
914 "invalid input key `{key}`: a task or workflow named `{name}` does not exist in \
915 the document"
916 ),
917 }
918 }
919
920 fn parse_task_inputs(
922 document: &Document,
923 task: &Task,
924 object: JsonMap,
925 ) -> Result<(String, Self)> {
926 let mut inputs = TaskInputs::default();
927 for (key, value) in object {
928 let value = serde_json::from_value(value)
930 .with_context(|| format!("invalid input key `{key}`"))?;
931
932 match key.split_once(".") {
933 Some((prefix, remainder)) if prefix == task.name() => {
934 inputs
935 .set_path_value(document, task, remainder, value)
936 .with_context(|| format!("invalid input key `{key}`"))?;
937 }
938 _ => {
939 bail!(
940 "invalid input key `{key}`: expected key to be prefixed with `{task}`",
941 task = task.name()
942 );
943 }
944 }
945 }
946
947 Ok((task.name().to_string(), Inputs::Task(inputs)))
948 }
949
950 fn parse_workflow_inputs(
952 document: &Document,
953 workflow: &Workflow,
954 object: JsonMap,
955 ) -> Result<(String, Self)> {
956 let mut inputs = WorkflowInputs::default();
957 for (key, value) in object {
958 let value = serde_json::from_value(value)
960 .with_context(|| format!("invalid input key `{key}`"))?;
961
962 match key.split_once(".") {
963 Some((prefix, remainder)) if prefix == workflow.name() => {
964 inputs
965 .set_path_value(document, workflow, remainder, value)
966 .with_context(|| format!("invalid input key `{key}`"))?;
967 }
968 _ => {
969 bail!(
970 "invalid input key `{key}`: expected key to be prefixed with `{workflow}`",
971 workflow = workflow.name()
972 );
973 }
974 }
975 }
976
977 Ok((workflow.name().to_string(), Inputs::Workflow(inputs)))
978 }
979}
980
981impl From<TaskInputs> for Inputs {
982 fn from(inputs: TaskInputs) -> Self {
983 Self::Task(inputs)
984 }
985}
986
987impl From<WorkflowInputs> for Inputs {
988 fn from(inputs: WorkflowInputs) -> Self {
989 Self::Workflow(inputs)
990 }
991}