dora_core/descriptor/
validate.rs

1use crate::{
2    adjust_shared_library_path,
3    descriptor::{self, source_is_url},
4    get_python_path,
5};
6
7use dora_message::{
8    config::{Input, InputMapping, UserInputMapping},
9    descriptor::{CoreNodeKind, DYNAMIC_SOURCE, OperatorSource, ResolvedNode, SHELL_SOURCE},
10    id::{DataId, NodeId, OperatorId},
11};
12use eyre::{Context, bail, eyre};
13use std::{collections::BTreeMap, path::Path, process::Command};
14use tracing::info;
15
16use super::{Descriptor, DescriptorExt, resolve_path};
17const VERSION: &str = env!("CARGO_PKG_VERSION");
18
19pub fn check_dataflow(
20    dataflow: &Descriptor,
21    working_dir: &Path,
22    remote_daemon_id: Option<&[&str]>,
23    coordinator_is_remote: bool,
24) -> eyre::Result<()> {
25    let nodes = dataflow.resolve_aliases_and_set_defaults()?;
26    let mut has_python_operator = false;
27
28    // check that nodes and operators exist
29    for node in nodes.values() {
30        match &node.kind {
31            descriptor::CoreNodeKind::Custom(custom) => match &custom.source {
32                dora_message::descriptor::NodeSource::Local => match custom.path.as_str() {
33                    SHELL_SOURCE => (),
34                    DYNAMIC_SOURCE => (),
35                    source => {
36                        if source_is_url(source) {
37                            info!("{source} is a URL."); // TODO: Implement url check.
38                        } else if let Some(remote_daemon_id) = remote_daemon_id {
39                            if let Some(deploy) = &node.deploy {
40                                if let Some(machine) = &deploy.machine {
41                                    if remote_daemon_id.contains(&machine.as_str())
42                                        || coordinator_is_remote
43                                    {
44                                        info!("skipping path check for remote node `{}`", node.id);
45                                    }
46                                }
47                            }
48                        } else if custom.build.is_some() {
49                            info!("skipping path check for node with build command");
50                        } else {
51                            resolve_path(source, working_dir).wrap_err_with(|| {
52                                format!("Could not find source path `{source}`")
53                            })?;
54                        };
55                    }
56                },
57                dora_message::descriptor::NodeSource::GitBranch { .. } => {
58                    info!("skipping check for node with git source");
59                }
60            },
61            descriptor::CoreNodeKind::Runtime(node) => {
62                for operator_definition in &node.operators {
63                    match &operator_definition.config.source {
64                        OperatorSource::SharedLibrary(path) => {
65                            if source_is_url(path) {
66                                info!("{path} is a URL."); // TODO: Implement url check.
67                            } else if operator_definition.config.build.is_some() {
68                                info!("skipping path check for operator with build command");
69                            } else {
70                                let path = adjust_shared_library_path(Path::new(&path))?;
71                                if !working_dir.join(&path).exists() {
72                                    bail!("no shared library at `{}`", path.display());
73                                }
74                            }
75                        }
76                        OperatorSource::Python(python_source) => {
77                            has_python_operator = true;
78                            let path = &python_source.source;
79                            if source_is_url(path) {
80                                info!("{path} is a URL."); // TODO: Implement url check.
81                            } else if !working_dir.join(path).exists() {
82                                bail!("no Python library at `{path}`");
83                            }
84                        }
85                        OperatorSource::Wasm(path) => {
86                            if source_is_url(path) {
87                                info!("{path} is a URL."); // TODO: Implement url check.
88                            } else if !working_dir.join(path).exists() {
89                                bail!("no WASM library at `{path}`");
90                            }
91                        }
92                    }
93                }
94            }
95        }
96    }
97
98    // check that all inputs mappings point to an existing output
99    for node in nodes.values() {
100        match &node.kind {
101            descriptor::CoreNodeKind::Custom(custom_node) => {
102                for (input_id, input) in &custom_node.run_config.inputs {
103                    check_input(input, &nodes, &format!("{}/{input_id}", node.id))?;
104                }
105            }
106            descriptor::CoreNodeKind::Runtime(runtime_node) => {
107                for operator_definition in &runtime_node.operators {
108                    for (input_id, input) in &operator_definition.config.inputs {
109                        check_input(
110                            input,
111                            &nodes,
112                            &format!("{}/{}/{input_id}", operator_definition.id, node.id),
113                        )?;
114                    }
115                }
116            }
117        };
118    }
119
120    // Check that nodes can resolve `send_stdout_as`
121    for node in nodes.values() {
122        node.send_stdout_as()
123            .context("Could not resolve `send_stdout_as` configuration")?;
124    }
125
126    if has_python_operator {
127        check_python_runtime()?;
128    }
129
130    Ok(())
131}
132
133pub trait ResolvedNodeExt {
134    fn send_stdout_as(&self) -> eyre::Result<Option<String>>;
135}
136
137impl ResolvedNodeExt for ResolvedNode {
138    fn send_stdout_as(&self) -> eyre::Result<Option<String>> {
139        match &self.kind {
140            // TODO: Split stdout between operators
141            CoreNodeKind::Runtime(n) => {
142                let count = n
143                    .operators
144                    .iter()
145                    .filter(|op| op.config.send_stdout_as.is_some())
146                    .count();
147                if count == 1 && n.operators.len() > 1 {
148                    tracing::warn!(
149                        "All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator."
150                    )
151                } else if count > 1 {
152                    return Err(eyre!(
153                        "More than one `send_stdout_as` entries for a runtime node. Please only use one `send_stdout_as` per runtime."
154                    ));
155                }
156                Ok(n.operators.iter().find_map(|op| {
157                    op.config
158                        .send_stdout_as
159                        .clone()
160                        .map(|stdout| format!("{}/{}", op.id, stdout))
161                }))
162            }
163            CoreNodeKind::Custom(n) => Ok(n.send_stdout_as.clone()),
164        }
165    }
166}
167
168fn check_input(
169    input: &Input,
170    nodes: &BTreeMap<NodeId, super::ResolvedNode>,
171    input_id_str: &str,
172) -> Result<(), eyre::ErrReport> {
173    match &input.mapping {
174        InputMapping::Timer { interval: _ } => {}
175        InputMapping::User(UserInputMapping { source, output }) => {
176            let source_node = nodes.values().find(|n| &n.id == source).ok_or_else(|| {
177                eyre!("source node `{source}` mapped to input `{input_id_str}` does not exist",)
178            })?;
179            match &source_node.kind {
180                CoreNodeKind::Custom(custom_node) => {
181                    if !custom_node.run_config.outputs.contains(output) {
182                        bail!(
183                            "output `{source}/{output}` mapped to \
184                            input `{input_id_str}` does not exist",
185                        );
186                    }
187                }
188                CoreNodeKind::Runtime(runtime) => {
189                    let (operator_id, output) = output.split_once('/').unwrap_or_default();
190                    let operator_id = OperatorId::from(operator_id.to_owned());
191                    let output = DataId::from(output.to_owned());
192
193                    let operator = runtime
194                        .operators
195                        .iter()
196                        .find(|o| o.id == operator_id)
197                        .ok_or_else(|| {
198                            eyre!(
199                                "source operator `{source}/{operator_id}` used \
200                                for input `{input_id_str}` does not exist",
201                            )
202                        })?;
203
204                    if !operator.config.outputs.contains(&output) {
205                        bail!(
206                            "output `{source}/{operator_id}/{output}` mapped to \
207                            input `{input_id_str}` does not exist",
208                        );
209                    }
210                }
211            }
212        }
213    };
214    Ok(())
215}
216
217fn check_python_runtime() -> eyre::Result<()> {
218    // Check if python dora-rs is installed and match cli version
219    let reinstall_command =
220        format!("Please reinstall it with: `pip install dora-rs=={VERSION} --force`");
221    let mut command = Command::new(get_python_path().context("Could not get python binary")?);
222    command.args([
223        "-c",
224        &format!(
225            "
226import dora;
227assert dora.__version__=='{VERSION}',  'Python dora-rs should be {VERSION}, but current version is %s. {reinstall_command}' % (dora.__version__)
228        "
229        ),
230    ]);
231    let mut result = command
232        .spawn()
233        .wrap_err("Could not spawn python dora-rs command.")?;
234    let status = result
235        .wait()
236        .wrap_err("Could not get exit status when checking python dora-rs")?;
237
238    if !status.success() {
239        bail!("Something went wrong with Python dora-rs. {reinstall_command}")
240    }
241
242    Ok(())
243}