dora_core/descriptor/
validate.rs1use crate::{
2 adjust_shared_library_path,
3 descriptor::{self, source_is_url},
4 get_python_path,
5};
6
7use dora_message::{
8 config::{Input, InputMapping, UserInputMapping},
9 descriptor::{CoreNodeKind, DYNAMIC_SOURCE, OperatorSource, ResolvedNode, SHELL_SOURCE},
10 id::{DataId, NodeId, OperatorId},
11};
12use eyre::{Context, bail, eyre};
13use std::{collections::BTreeMap, path::Path, process::Command};
14use tracing::info;
15
16use super::{Descriptor, DescriptorExt, resolve_path};
17const VERSION: &str = env!("CARGO_PKG_VERSION");
18
19pub fn check_dataflow(
20 dataflow: &Descriptor,
21 working_dir: &Path,
22 remote_daemon_id: Option<&[&str]>,
23 coordinator_is_remote: bool,
24) -> eyre::Result<()> {
25 let nodes = dataflow.resolve_aliases_and_set_defaults()?;
26 let mut has_python_operator = false;
27
28 for node in nodes.values() {
30 match &node.kind {
31 descriptor::CoreNodeKind::Custom(custom) => match &custom.source {
32 dora_message::descriptor::NodeSource::Local => match custom.path.as_str() {
33 SHELL_SOURCE => (),
34 DYNAMIC_SOURCE => (),
35 source => {
36 if source_is_url(source) {
37 info!("{source} is a URL."); } else if let Some(remote_daemon_id) = remote_daemon_id {
39 if let Some(deploy) = &node.deploy {
40 if let Some(machine) = &deploy.machine {
41 if remote_daemon_id.contains(&machine.as_str())
42 || coordinator_is_remote
43 {
44 info!("skipping path check for remote node `{}`", node.id);
45 }
46 }
47 }
48 } else if custom.build.is_some() {
49 info!("skipping path check for node with build command");
50 } else {
51 resolve_path(source, working_dir).wrap_err_with(|| {
52 format!("Could not find source path `{source}`")
53 })?;
54 };
55 }
56 },
57 dora_message::descriptor::NodeSource::GitBranch { .. } => {
58 info!("skipping check for node with git source");
59 }
60 },
61 descriptor::CoreNodeKind::Runtime(node) => {
62 for operator_definition in &node.operators {
63 match &operator_definition.config.source {
64 OperatorSource::SharedLibrary(path) => {
65 if source_is_url(path) {
66 info!("{path} is a URL."); } else if operator_definition.config.build.is_some() {
68 info!("skipping path check for operator with build command");
69 } else {
70 let path = adjust_shared_library_path(Path::new(&path))?;
71 if !working_dir.join(&path).exists() {
72 bail!("no shared library at `{}`", path.display());
73 }
74 }
75 }
76 OperatorSource::Python(python_source) => {
77 has_python_operator = true;
78 let path = &python_source.source;
79 if source_is_url(path) {
80 info!("{path} is a URL."); } else if !working_dir.join(path).exists() {
82 bail!("no Python library at `{path}`");
83 }
84 }
85 OperatorSource::Wasm(path) => {
86 if source_is_url(path) {
87 info!("{path} is a URL."); } else if !working_dir.join(path).exists() {
89 bail!("no WASM library at `{path}`");
90 }
91 }
92 }
93 }
94 }
95 }
96 }
97
98 for node in nodes.values() {
100 match &node.kind {
101 descriptor::CoreNodeKind::Custom(custom_node) => {
102 for (input_id, input) in &custom_node.run_config.inputs {
103 check_input(input, &nodes, &format!("{}/{input_id}", node.id))?;
104 }
105 }
106 descriptor::CoreNodeKind::Runtime(runtime_node) => {
107 for operator_definition in &runtime_node.operators {
108 for (input_id, input) in &operator_definition.config.inputs {
109 check_input(
110 input,
111 &nodes,
112 &format!("{}/{}/{input_id}", operator_definition.id, node.id),
113 )?;
114 }
115 }
116 }
117 };
118 }
119
120 for node in nodes.values() {
122 node.send_stdout_as()
123 .context("Could not resolve `send_stdout_as` configuration")?;
124 }
125
126 if has_python_operator {
127 check_python_runtime()?;
128 }
129
130 Ok(())
131}
132
133pub trait ResolvedNodeExt {
134 fn send_stdout_as(&self) -> eyre::Result<Option<String>>;
135}
136
137impl ResolvedNodeExt for ResolvedNode {
138 fn send_stdout_as(&self) -> eyre::Result<Option<String>> {
139 match &self.kind {
140 CoreNodeKind::Runtime(n) => {
142 let count = n
143 .operators
144 .iter()
145 .filter(|op| op.config.send_stdout_as.is_some())
146 .count();
147 if count == 1 && n.operators.len() > 1 {
148 tracing::warn!(
149 "All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator."
150 )
151 } else if count > 1 {
152 return Err(eyre!(
153 "More than one `send_stdout_as` entries for a runtime node. Please only use one `send_stdout_as` per runtime."
154 ));
155 }
156 Ok(n.operators.iter().find_map(|op| {
157 op.config
158 .send_stdout_as
159 .clone()
160 .map(|stdout| format!("{}/{}", op.id, stdout))
161 }))
162 }
163 CoreNodeKind::Custom(n) => Ok(n.send_stdout_as.clone()),
164 }
165 }
166}
167
168fn check_input(
169 input: &Input,
170 nodes: &BTreeMap<NodeId, super::ResolvedNode>,
171 input_id_str: &str,
172) -> Result<(), eyre::ErrReport> {
173 match &input.mapping {
174 InputMapping::Timer { interval: _ } => {}
175 InputMapping::User(UserInputMapping { source, output }) => {
176 let source_node = nodes.values().find(|n| &n.id == source).ok_or_else(|| {
177 eyre!("source node `{source}` mapped to input `{input_id_str}` does not exist",)
178 })?;
179 match &source_node.kind {
180 CoreNodeKind::Custom(custom_node) => {
181 if !custom_node.run_config.outputs.contains(output) {
182 bail!(
183 "output `{source}/{output}` mapped to \
184 input `{input_id_str}` does not exist",
185 );
186 }
187 }
188 CoreNodeKind::Runtime(runtime) => {
189 let (operator_id, output) = output.split_once('/').unwrap_or_default();
190 let operator_id = OperatorId::from(operator_id.to_owned());
191 let output = DataId::from(output.to_owned());
192
193 let operator = runtime
194 .operators
195 .iter()
196 .find(|o| o.id == operator_id)
197 .ok_or_else(|| {
198 eyre!(
199 "source operator `{source}/{operator_id}` used \
200 for input `{input_id_str}` does not exist",
201 )
202 })?;
203
204 if !operator.config.outputs.contains(&output) {
205 bail!(
206 "output `{source}/{operator_id}/{output}` mapped to \
207 input `{input_id_str}` does not exist",
208 );
209 }
210 }
211 }
212 }
213 };
214 Ok(())
215}
216
217fn check_python_runtime() -> eyre::Result<()> {
218 let reinstall_command =
220 format!("Please reinstall it with: `pip install dora-rs=={VERSION} --force`");
221 let mut command = Command::new(get_python_path().context("Could not get python binary")?);
222 command.args([
223 "-c",
224 &format!(
225 "
226import dora;
227assert dora.__version__=='{VERSION}', 'Python dora-rs should be {VERSION}, but current version is %s. {reinstall_command}' % (dora.__version__)
228 "
229 ),
230 ]);
231 let mut result = command
232 .spawn()
233 .wrap_err("Could not spawn python dora-rs command.")?;
234 let status = result
235 .wait()
236 .wrap_err("Could not get exit status when checking python dora-rs")?;
237
238 if !status.success() {
239 bail!("Something went wrong with Python dora-rs. {reinstall_command}")
240 }
241
242 Ok(())
243}