Skip to main content

dora_core/descriptor/
mod.rs

1use dora_message::{
2    config::{Input, InputMapping, NodeRunConfig},
3    descriptor::{GitRepoRev, NodeSource},
4    id::{DataId, NodeId, OperatorId},
5};
6use eyre::{Context, OptionExt, Result, bail};
7use std::{
8    collections::{BTreeMap, HashMap},
9    env::consts::EXE_EXTENSION,
10    path::{Path, PathBuf},
11    process::Stdio,
12};
13
14// reexport for compatibility
15pub use dora_message::descriptor::{
16    CoreNodeKind, CustomNode, DYNAMIC_SOURCE, Descriptor, Node, OperatorConfig, OperatorDefinition,
17    OperatorSource, PythonSource, ResolvedNode, RuntimeNode, SHELL_SOURCE,
18    SingleOperatorDefinition,
19};
20pub use validate::ResolvedNodeExt;
21pub use visualize::collect_dora_timers;
22
23mod validate;
24mod visualize;
25
26pub trait DescriptorExt {
27    fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>>;
28    fn visualize_as_mermaid(&self) -> eyre::Result<String>;
29    fn blocking_read(path: &Path) -> eyre::Result<Descriptor>;
30    fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor>;
31    fn check(&self, working_dir: &Path) -> eyre::Result<()>;
32    fn check_in_daemon(&self, working_dir: &Path, coordinator_is_remote: bool) -> eyre::Result<()>;
33}
34
35pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op";
36
37impl DescriptorExt for Descriptor {
38    fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>> {
39        let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());
40
41        let single_operator_nodes: HashMap<_, _> = self
42            .nodes
43            .iter()
44            .filter_map(|n| {
45                n.operator
46                    .as_ref()
47                    .map(|op| (&n.id, op.id.as_ref().unwrap_or(&default_op_id)))
48            })
49            .collect();
50
51        let mut resolved = BTreeMap::new();
52        for mut node in self.nodes.clone() {
53            // adjust input mappings
54            let mut node_kind = node_kind_mut(&mut node)?;
55            let input_mappings: Vec<_> = match &mut node_kind {
56                NodeKindMut::Standard { inputs, .. } => inputs.values_mut().collect(),
57                NodeKindMut::Runtime(node) => node
58                    .operators
59                    .iter_mut()
60                    .flat_map(|op| op.config.inputs.values_mut())
61                    .collect(),
62                NodeKindMut::Custom(node) => node.run_config.inputs.values_mut().collect(),
63                NodeKindMut::Operator(operator) => operator.config.inputs.values_mut().collect(),
64            };
65            for mapping in input_mappings
66                .into_iter()
67                .filter_map(|i| match &mut i.mapping {
68                    InputMapping::Timer { .. } => None,
69                    InputMapping::User(m) => Some(m),
70                })
71            {
72                if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() {
73                    mapping.output = DataId::from(format!("{op_name}/{}", mapping.output));
74                }
75            }
76
77            // resolve nodes
78            let kind = match node_kind {
79                NodeKindMut::Standard {
80                    path,
81                    source,
82                    inputs: _,
83                } => CoreNodeKind::Custom(CustomNode {
84                    path: path.clone(),
85                    source,
86                    args: node.args,
87                    build: node.build,
88                    send_stdout_as: node.send_stdout_as,
89                    run_config: NodeRunConfig {
90                        inputs: node.inputs,
91                        outputs: node.outputs,
92                    },
93                    envs: None,
94                    restart_policy: node.restart_policy,
95                }),
96                NodeKindMut::Custom(node) => CoreNodeKind::Custom(node.clone()),
97                NodeKindMut::Runtime(node) => CoreNodeKind::Runtime(node.clone()),
98                NodeKindMut::Operator(op) => CoreNodeKind::Runtime(RuntimeNode {
99                    operators: vec![OperatorDefinition {
100                        id: op.id.clone().unwrap_or_else(|| default_op_id.clone()),
101                        config: op.config.clone(),
102                    }],
103                }),
104            };
105
106            let env = match (self.env.clone(), node.env) {
107                (None, node_env) => node_env,
108                (Some(mut self_env), node_env) => {
109                    self_env.extend(node_env.unwrap_or_default());
110                    Some(self_env)
111                }
112            };
113
114            resolved.insert(
115                node.id.clone(),
116                ResolvedNode {
117                    id: node.id,
118                    name: node.name,
119                    description: node.description,
120                    env,
121                    deploy: node.deploy,
122                    kind,
123                },
124            );
125        }
126
127        Ok(resolved)
128    }
129
130    fn visualize_as_mermaid(&self) -> eyre::Result<String> {
131        let resolved = self.resolve_aliases_and_set_defaults()?;
132        let flowchart = visualize::visualize_nodes(&resolved);
133
134        Ok(flowchart)
135    }
136
137    fn blocking_read(path: &Path) -> eyre::Result<Descriptor> {
138        let buf = std::fs::read(path).context("failed to open given file")?;
139        Descriptor::parse(buf)
140    }
141
142    fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor> {
143        serde_yaml::from_slice(&buf).context("failed to parse given descriptor")
144    }
145
146    fn check(&self, working_dir: &Path) -> eyre::Result<()> {
147        validate::check_dataflow(self, working_dir, None, false)
148            .wrap_err("Dataflow could not be validated.")
149    }
150
151    fn check_in_daemon(&self, working_dir: &Path, coordinator_is_remote: bool) -> eyre::Result<()> {
152        validate::check_dataflow(self, working_dir, None, coordinator_is_remote)
153            .wrap_err("Dataflow could not be validated.")
154    }
155}
156
157pub async fn read_as_descriptor(path: &Path) -> eyre::Result<Descriptor> {
158    let buf = tokio::fs::read(path)
159        .await
160        .context("failed to open given file")?;
161    Descriptor::parse(buf)
162}
163
164fn node_kind_mut(node: &mut Node) -> eyre::Result<NodeKindMut> {
165    match node.kind()? {
166        NodeKind::Standard(_) => {
167            let source = match (&node.git, &node.branch, &node.tag, &node.rev) {
168                (None, None, None, None) => NodeSource::Local,
169                (Some(repo), branch, tag, rev) => {
170                    let rev = match (branch, tag, rev) {
171                        (None, None, None) => None,
172                        (Some(branch), None, None) => Some(GitRepoRev::Branch(branch.clone())),
173                        (None, Some(tag), None) => Some(GitRepoRev::Tag(tag.clone())),
174                        (None, None, Some(rev)) => Some(GitRepoRev::Rev(rev.clone())),
175                        other @ (_, _, _) => {
176                            eyre::bail!(
177                                "only one of `branch`, `tag`, and `rev` are allowed (got {other:?})"
178                            )
179                        }
180                    };
181                    NodeSource::GitBranch {
182                        repo: repo.clone(),
183                        rev,
184                    }
185                }
186                (None, _, _, _) => {
187                    eyre::bail!("`git` source required when using branch, tag, or rev")
188                }
189            };
190
191            Ok(NodeKindMut::Standard {
192                path: node.path.as_ref().ok_or_eyre("missing `path` attribute")?,
193                source,
194                inputs: &mut node.inputs,
195            })
196        }
197        NodeKind::Runtime(_) => node
198            .operators
199            .as_mut()
200            .map(NodeKindMut::Runtime)
201            .ok_or_eyre("no operators"),
202        NodeKind::Custom(_) => node
203            .custom
204            .as_mut()
205            .map(NodeKindMut::Custom)
206            .ok_or_eyre("no custom"),
207        NodeKind::Operator(_) => node
208            .operator
209            .as_mut()
210            .map(NodeKindMut::Operator)
211            .ok_or_eyre("no operator"),
212    }
213}
214
215pub fn source_is_url(source: &str) -> bool {
216    source.contains("://")
217}
218
219pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
220    let path = Path::new(&source);
221    let path = if path.extension().is_none() {
222        path.with_extension(EXE_EXTENSION)
223    } else {
224        path.to_owned()
225    };
226
227    // Search path within current working directory
228    if let Ok(abs_path) = working_dir.join(&path).canonicalize() {
229        Ok(abs_path)
230    // Search path within $PATH
231    } else if which::which("uv").is_ok() {
232        // spawn: uv run which <path>
233        let which = if cfg!(windows) { "where" } else { "which" };
234        let output = std::process::Command::new("uv")
235            .arg("run")
236            .arg(which)
237            .arg(&path)
238            .stdout(Stdio::null())
239            .stderr(Stdio::null())
240            .output()
241            .context("Could not run `uv run` to find binary")?;
242        if output.status.success() {
243            Ok(path)
244        } else if let Ok(abs_path) = which::which(&path) {
245            Ok(abs_path)
246        } else {
247            bail!("Could not find source path {}", path.display())
248        }
249    } else if let Ok(abs_path) = which::which(&path) {
250        Ok(abs_path)
251    } else {
252        bail!("Could not find source path {}", path.display())
253    }
254}
255
256pub trait NodeExt {
257    fn kind(&self) -> eyre::Result<NodeKind>;
258}
259
260impl NodeExt for Node {
261    fn kind(&self) -> eyre::Result<NodeKind> {
262        match (&self.path, &self.operators, &self.custom, &self.operator) {
263            (None, None, None, None) => {
264                eyre::bail!(
265                    "node `{}` requires a `path`, `custom`, or `operators` field",
266                    self.id
267                )
268            }
269            (None, None, None, Some(operator)) => Ok(NodeKind::Operator(operator)),
270            (None, None, Some(custom), None) => Ok(NodeKind::Custom(custom)),
271            (None, Some(runtime), None, None) => Ok(NodeKind::Runtime(runtime)),
272            (Some(path), None, None, None) => Ok(NodeKind::Standard(path)),
273            _ => {
274                eyre::bail!(
275                    "node `{}` has multiple exclusive fields set, only one of `path`, `custom`, `operators` and `operator` is allowed",
276                    self.id
277                )
278            }
279        }
280    }
281}
282
283#[derive(Debug)]
284pub enum NodeKind<'a> {
285    Standard(&'a String),
286    /// Dora runtime node
287    Runtime(&'a RuntimeNode),
288    Custom(&'a CustomNode),
289    Operator(&'a SingleOperatorDefinition),
290}
291
292#[derive(Debug)]
293enum NodeKindMut<'a> {
294    Standard {
295        path: &'a String,
296        source: NodeSource,
297        inputs: &'a mut BTreeMap<DataId, Input>,
298    },
299    /// Dora runtime node
300    Runtime(&'a mut RuntimeNode),
301    Custom(&'a mut CustomNode),
302    Operator(&'a mut SingleOperatorDefinition),
303}