dora_core/descriptor/
mod.rs

1use dora_message::{
2    config::{Input, InputMapping, NodeRunConfig},
3    descriptor::{GitRepoRev, NodeSource},
4    id::{DataId, NodeId, OperatorId},
5};
6use eyre::{Context, OptionExt, Result, bail};
7use std::{
8    collections::{BTreeMap, HashMap},
9    env::consts::EXE_EXTENSION,
10    path::{Path, PathBuf},
11    process::Stdio,
12};
13use tokio::process::Command;
14
15// reexport for compatibility
16pub use dora_message::descriptor::{
17    CoreNodeKind, CustomNode, DYNAMIC_SOURCE, Descriptor, Node, OperatorConfig, OperatorDefinition,
18    OperatorSource, PythonSource, ResolvedNode, RuntimeNode, SHELL_SOURCE,
19    SingleOperatorDefinition,
20};
21pub use validate::ResolvedNodeExt;
22pub use visualize::collect_dora_timers;
23
24mod validate;
25mod visualize;
26
27pub trait DescriptorExt {
28    fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>>;
29    fn visualize_as_mermaid(&self) -> eyre::Result<String>;
30    fn blocking_read(path: &Path) -> eyre::Result<Descriptor>;
31    fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor>;
32    fn check(&self, working_dir: &Path) -> eyre::Result<()>;
33    fn check_in_daemon(&self, working_dir: &Path, coordinator_is_remote: bool) -> eyre::Result<()>;
34}
35
36pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op";
37
38impl DescriptorExt for Descriptor {
39    fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>> {
40        let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());
41
42        let single_operator_nodes: HashMap<_, _> = self
43            .nodes
44            .iter()
45            .filter_map(|n| {
46                n.operator
47                    .as_ref()
48                    .map(|op| (&n.id, op.id.as_ref().unwrap_or(&default_op_id)))
49            })
50            .collect();
51
52        let mut resolved = BTreeMap::new();
53        for mut node in self.nodes.clone() {
54            // adjust input mappings
55            let mut node_kind = node_kind_mut(&mut node)?;
56            let input_mappings: Vec<_> = match &mut node_kind {
57                NodeKindMut::Standard { inputs, .. } => inputs.values_mut().collect(),
58                NodeKindMut::Runtime(node) => node
59                    .operators
60                    .iter_mut()
61                    .flat_map(|op| op.config.inputs.values_mut())
62                    .collect(),
63                NodeKindMut::Custom(node) => node.run_config.inputs.values_mut().collect(),
64                NodeKindMut::Operator(operator) => operator.config.inputs.values_mut().collect(),
65            };
66            for mapping in input_mappings
67                .into_iter()
68                .filter_map(|i| match &mut i.mapping {
69                    InputMapping::Timer { .. } => None,
70                    InputMapping::User(m) => Some(m),
71                })
72            {
73                if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() {
74                    mapping.output = DataId::from(format!("{op_name}/{}", mapping.output));
75                }
76            }
77
78            // resolve nodes
79            let kind = match node_kind {
80                NodeKindMut::Standard {
81                    path,
82                    source,
83                    inputs: _,
84                } => CoreNodeKind::Custom(CustomNode {
85                    path: path.clone(),
86                    source,
87                    args: node.args,
88                    build: node.build,
89                    send_stdout_as: node.send_stdout_as,
90                    run_config: NodeRunConfig {
91                        inputs: node.inputs,
92                        outputs: node.outputs,
93                    },
94                    envs: None,
95                    restart_policy: node.restart_policy,
96                }),
97                NodeKindMut::Custom(node) => CoreNodeKind::Custom(node.clone()),
98                NodeKindMut::Runtime(node) => CoreNodeKind::Runtime(node.clone()),
99                NodeKindMut::Operator(op) => CoreNodeKind::Runtime(RuntimeNode {
100                    operators: vec![OperatorDefinition {
101                        id: op.id.clone().unwrap_or_else(|| default_op_id.clone()),
102                        config: op.config.clone(),
103                    }],
104                }),
105            };
106
107            resolved.insert(
108                node.id.clone(),
109                ResolvedNode {
110                    id: node.id,
111                    name: node.name,
112                    description: node.description,
113                    env: node.env,
114                    deploy: node.deploy,
115                    kind,
116                },
117            );
118        }
119
120        Ok(resolved)
121    }
122
123    fn visualize_as_mermaid(&self) -> eyre::Result<String> {
124        let resolved = self.resolve_aliases_and_set_defaults()?;
125        let flowchart = visualize::visualize_nodes(&resolved);
126
127        Ok(flowchart)
128    }
129
130    fn blocking_read(path: &Path) -> eyre::Result<Descriptor> {
131        let buf = std::fs::read(path).context("failed to open given file")?;
132        Descriptor::parse(buf)
133    }
134
135    fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor> {
136        serde_yaml::from_slice(&buf).context("failed to parse given descriptor")
137    }
138
139    fn check(&self, working_dir: &Path) -> eyre::Result<()> {
140        validate::check_dataflow(self, working_dir, None, false)
141            .wrap_err("Dataflow could not be validated.")
142    }
143
144    fn check_in_daemon(&self, working_dir: &Path, coordinator_is_remote: bool) -> eyre::Result<()> {
145        validate::check_dataflow(self, working_dir, None, coordinator_is_remote)
146            .wrap_err("Dataflow could not be validated.")
147    }
148}
149
150pub async fn read_as_descriptor(path: &Path) -> eyre::Result<Descriptor> {
151    let buf = tokio::fs::read(path)
152        .await
153        .context("failed to open given file")?;
154    Descriptor::parse(buf)
155}
156
157fn node_kind_mut(node: &mut Node) -> eyre::Result<NodeKindMut> {
158    match node.kind()? {
159        NodeKind::Standard(_) => {
160            let source = match (&node.git, &node.branch, &node.tag, &node.rev) {
161                (None, None, None, None) => NodeSource::Local,
162                (Some(repo), branch, tag, rev) => {
163                    let rev = match (branch, tag, rev) {
164                        (None, None, None) => None,
165                        (Some(branch), None, None) => Some(GitRepoRev::Branch(branch.clone())),
166                        (None, Some(tag), None) => Some(GitRepoRev::Tag(tag.clone())),
167                        (None, None, Some(rev)) => Some(GitRepoRev::Rev(rev.clone())),
168                        other @ (_, _, _) => {
169                            eyre::bail!(
170                                "only one of `branch`, `tag`, and `rev` are allowed (got {other:?})"
171                            )
172                        }
173                    };
174                    NodeSource::GitBranch {
175                        repo: repo.clone(),
176                        rev,
177                    }
178                }
179                (None, _, _, _) => {
180                    eyre::bail!("`git` source required when using branch, tag, or rev")
181                }
182            };
183
184            Ok(NodeKindMut::Standard {
185                path: node.path.as_ref().ok_or_eyre("missing `path` attribute")?,
186                source,
187                inputs: &mut node.inputs,
188            })
189        }
190        NodeKind::Runtime(_) => node
191            .operators
192            .as_mut()
193            .map(NodeKindMut::Runtime)
194            .ok_or_eyre("no operators"),
195        NodeKind::Custom(_) => node
196            .custom
197            .as_mut()
198            .map(NodeKindMut::Custom)
199            .ok_or_eyre("no custom"),
200        NodeKind::Operator(_) => node
201            .operator
202            .as_mut()
203            .map(NodeKindMut::Operator)
204            .ok_or_eyre("no operator"),
205    }
206}
207
208pub fn source_is_url(source: &str) -> bool {
209    source.contains("://")
210}
211
212pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
213    let path = Path::new(&source);
214    let path = if path.extension().is_none() {
215        path.with_extension(EXE_EXTENSION)
216    } else {
217        path.to_owned()
218    };
219
220    // Search path within current working directory
221    if let Ok(abs_path) = working_dir.join(&path).canonicalize() {
222        Ok(abs_path)
223    // Search path within $PATH
224    } else if which::which("uv").is_ok() {
225        // spawn: uv run which <path>
226        let which = if cfg!(windows) { "where" } else { "which" };
227        let _output = Command::new("uv")
228            .arg("run")
229            .arg(which)
230            .arg(&path)
231            .stdout(Stdio::null())
232            .spawn()
233            .context("Could not find binary within uv")?;
234        Ok(path)
235    } else if let Ok(abs_path) = which::which(&path) {
236        Ok(abs_path)
237    } else {
238        bail!("Could not find source path {}", path.display())
239    }
240}
241
242pub trait NodeExt {
243    fn kind(&self) -> eyre::Result<NodeKind>;
244}
245
246impl NodeExt for Node {
247    fn kind(&self) -> eyre::Result<NodeKind> {
248        match (&self.path, &self.operators, &self.custom, &self.operator) {
249            (None, None, None, None) => {
250                eyre::bail!(
251                    "node `{}` requires a `path`, `custom`, or `operators` field",
252                    self.id
253                )
254            }
255            (None, None, None, Some(operator)) => Ok(NodeKind::Operator(operator)),
256            (None, None, Some(custom), None) => Ok(NodeKind::Custom(custom)),
257            (None, Some(runtime), None, None) => Ok(NodeKind::Runtime(runtime)),
258            (Some(path), None, None, None) => Ok(NodeKind::Standard(path)),
259            _ => {
260                eyre::bail!(
261                    "node `{}` has multiple exclusive fields set, only one of `path`, `custom`, `operators` and `operator` is allowed",
262                    self.id
263                )
264            }
265        }
266    }
267}
268
269#[derive(Debug)]
270pub enum NodeKind<'a> {
271    Standard(&'a String),
272    /// Dora runtime node
273    Runtime(&'a RuntimeNode),
274    Custom(&'a CustomNode),
275    Operator(&'a SingleOperatorDefinition),
276}
277
278#[derive(Debug)]
279enum NodeKindMut<'a> {
280    Standard {
281        path: &'a String,
282        source: NodeSource,
283        inputs: &'a mut BTreeMap<DataId, Input>,
284    },
285    /// Dora runtime node
286    Runtime(&'a mut RuntimeNode),
287    Custom(&'a mut CustomNode),
288    Operator(&'a mut SingleOperatorDefinition),
289}