1use dora_message::{
2 config::{Input, InputMapping, NodeRunConfig},
3 descriptor::{GitRepoRev, NodeSource},
4 id::{DataId, NodeId, OperatorId},
5};
6use eyre::{Context, OptionExt, Result, bail};
7use std::{
8 collections::{BTreeMap, HashMap},
9 env::consts::EXE_EXTENSION,
10 path::{Path, PathBuf},
11 process::Stdio,
12};
13use tokio::process::Command;
14
15pub use dora_message::descriptor::{
17 CoreNodeKind, CustomNode, DYNAMIC_SOURCE, Descriptor, Node, OperatorConfig, OperatorDefinition,
18 OperatorSource, PythonSource, ResolvedNode, RuntimeNode, SHELL_SOURCE,
19 SingleOperatorDefinition,
20};
21pub use validate::ResolvedNodeExt;
22pub use visualize::collect_dora_timers;
23
24mod validate;
25mod visualize;
26
27pub trait DescriptorExt {
28 fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>>;
29 fn visualize_as_mermaid(&self) -> eyre::Result<String>;
30 fn blocking_read(path: &Path) -> eyre::Result<Descriptor>;
31 fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor>;
32 fn check(&self, working_dir: &Path) -> eyre::Result<()>;
33 fn check_in_daemon(&self, working_dir: &Path, coordinator_is_remote: bool) -> eyre::Result<()>;
34}
35
36pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op";
37
38impl DescriptorExt for Descriptor {
39 fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>> {
40 let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());
41
42 let single_operator_nodes: HashMap<_, _> = self
43 .nodes
44 .iter()
45 .filter_map(|n| {
46 n.operator
47 .as_ref()
48 .map(|op| (&n.id, op.id.as_ref().unwrap_or(&default_op_id)))
49 })
50 .collect();
51
52 let mut resolved = BTreeMap::new();
53 for mut node in self.nodes.clone() {
54 let mut node_kind = node_kind_mut(&mut node)?;
56 let input_mappings: Vec<_> = match &mut node_kind {
57 NodeKindMut::Standard { inputs, .. } => inputs.values_mut().collect(),
58 NodeKindMut::Runtime(node) => node
59 .operators
60 .iter_mut()
61 .flat_map(|op| op.config.inputs.values_mut())
62 .collect(),
63 NodeKindMut::Custom(node) => node.run_config.inputs.values_mut().collect(),
64 NodeKindMut::Operator(operator) => operator.config.inputs.values_mut().collect(),
65 };
66 for mapping in input_mappings
67 .into_iter()
68 .filter_map(|i| match &mut i.mapping {
69 InputMapping::Timer { .. } => None,
70 InputMapping::User(m) => Some(m),
71 })
72 {
73 if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() {
74 mapping.output = DataId::from(format!("{op_name}/{}", mapping.output));
75 }
76 }
77
78 let kind = match node_kind {
80 NodeKindMut::Standard {
81 path,
82 source,
83 inputs: _,
84 } => CoreNodeKind::Custom(CustomNode {
85 path: path.clone(),
86 source,
87 args: node.args,
88 build: node.build,
89 send_stdout_as: node.send_stdout_as,
90 run_config: NodeRunConfig {
91 inputs: node.inputs,
92 outputs: node.outputs,
93 },
94 envs: None,
95 restart_policy: node.restart_policy,
96 }),
97 NodeKindMut::Custom(node) => CoreNodeKind::Custom(node.clone()),
98 NodeKindMut::Runtime(node) => CoreNodeKind::Runtime(node.clone()),
99 NodeKindMut::Operator(op) => CoreNodeKind::Runtime(RuntimeNode {
100 operators: vec![OperatorDefinition {
101 id: op.id.clone().unwrap_or_else(|| default_op_id.clone()),
102 config: op.config.clone(),
103 }],
104 }),
105 };
106
107 resolved.insert(
108 node.id.clone(),
109 ResolvedNode {
110 id: node.id,
111 name: node.name,
112 description: node.description,
113 env: node.env,
114 deploy: node.deploy,
115 kind,
116 },
117 );
118 }
119
120 Ok(resolved)
121 }
122
123 fn visualize_as_mermaid(&self) -> eyre::Result<String> {
124 let resolved = self.resolve_aliases_and_set_defaults()?;
125 let flowchart = visualize::visualize_nodes(&resolved);
126
127 Ok(flowchart)
128 }
129
130 fn blocking_read(path: &Path) -> eyre::Result<Descriptor> {
131 let buf = std::fs::read(path).context("failed to open given file")?;
132 Descriptor::parse(buf)
133 }
134
135 fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor> {
136 serde_yaml::from_slice(&buf).context("failed to parse given descriptor")
137 }
138
139 fn check(&self, working_dir: &Path) -> eyre::Result<()> {
140 validate::check_dataflow(self, working_dir, None, false)
141 .wrap_err("Dataflow could not be validated.")
142 }
143
144 fn check_in_daemon(&self, working_dir: &Path, coordinator_is_remote: bool) -> eyre::Result<()> {
145 validate::check_dataflow(self, working_dir, None, coordinator_is_remote)
146 .wrap_err("Dataflow could not be validated.")
147 }
148}
149
150pub async fn read_as_descriptor(path: &Path) -> eyre::Result<Descriptor> {
151 let buf = tokio::fs::read(path)
152 .await
153 .context("failed to open given file")?;
154 Descriptor::parse(buf)
155}
156
157fn node_kind_mut(node: &mut Node) -> eyre::Result<NodeKindMut> {
158 match node.kind()? {
159 NodeKind::Standard(_) => {
160 let source = match (&node.git, &node.branch, &node.tag, &node.rev) {
161 (None, None, None, None) => NodeSource::Local,
162 (Some(repo), branch, tag, rev) => {
163 let rev = match (branch, tag, rev) {
164 (None, None, None) => None,
165 (Some(branch), None, None) => Some(GitRepoRev::Branch(branch.clone())),
166 (None, Some(tag), None) => Some(GitRepoRev::Tag(tag.clone())),
167 (None, None, Some(rev)) => Some(GitRepoRev::Rev(rev.clone())),
168 other @ (_, _, _) => {
169 eyre::bail!(
170 "only one of `branch`, `tag`, and `rev` are allowed (got {other:?})"
171 )
172 }
173 };
174 NodeSource::GitBranch {
175 repo: repo.clone(),
176 rev,
177 }
178 }
179 (None, _, _, _) => {
180 eyre::bail!("`git` source required when using branch, tag, or rev")
181 }
182 };
183
184 Ok(NodeKindMut::Standard {
185 path: node.path.as_ref().ok_or_eyre("missing `path` attribute")?,
186 source,
187 inputs: &mut node.inputs,
188 })
189 }
190 NodeKind::Runtime(_) => node
191 .operators
192 .as_mut()
193 .map(NodeKindMut::Runtime)
194 .ok_or_eyre("no operators"),
195 NodeKind::Custom(_) => node
196 .custom
197 .as_mut()
198 .map(NodeKindMut::Custom)
199 .ok_or_eyre("no custom"),
200 NodeKind::Operator(_) => node
201 .operator
202 .as_mut()
203 .map(NodeKindMut::Operator)
204 .ok_or_eyre("no operator"),
205 }
206}
207
208pub fn source_is_url(source: &str) -> bool {
209 source.contains("://")
210}
211
212pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
213 let path = Path::new(&source);
214 let path = if path.extension().is_none() {
215 path.with_extension(EXE_EXTENSION)
216 } else {
217 path.to_owned()
218 };
219
220 if let Ok(abs_path) = working_dir.join(&path).canonicalize() {
222 Ok(abs_path)
223 } else if which::which("uv").is_ok() {
225 let which = if cfg!(windows) { "where" } else { "which" };
227 let _output = Command::new("uv")
228 .arg("run")
229 .arg(which)
230 .arg(&path)
231 .stdout(Stdio::null())
232 .spawn()
233 .context("Could not find binary within uv")?;
234 Ok(path)
235 } else if let Ok(abs_path) = which::which(&path) {
236 Ok(abs_path)
237 } else {
238 bail!("Could not find source path {}", path.display())
239 }
240}
241
242pub trait NodeExt {
243 fn kind(&self) -> eyre::Result<NodeKind>;
244}
245
246impl NodeExt for Node {
247 fn kind(&self) -> eyre::Result<NodeKind> {
248 match (&self.path, &self.operators, &self.custom, &self.operator) {
249 (None, None, None, None) => {
250 eyre::bail!(
251 "node `{}` requires a `path`, `custom`, or `operators` field",
252 self.id
253 )
254 }
255 (None, None, None, Some(operator)) => Ok(NodeKind::Operator(operator)),
256 (None, None, Some(custom), None) => Ok(NodeKind::Custom(custom)),
257 (None, Some(runtime), None, None) => Ok(NodeKind::Runtime(runtime)),
258 (Some(path), None, None, None) => Ok(NodeKind::Standard(path)),
259 _ => {
260 eyre::bail!(
261 "node `{}` has multiple exclusive fields set, only one of `path`, `custom`, `operators` and `operator` is allowed",
262 self.id
263 )
264 }
265 }
266 }
267}
268
269#[derive(Debug)]
270pub enum NodeKind<'a> {
271 Standard(&'a String),
272 Runtime(&'a RuntimeNode),
274 Custom(&'a CustomNode),
275 Operator(&'a SingleOperatorDefinition),
276}
277
278#[derive(Debug)]
279enum NodeKindMut<'a> {
280 Standard {
281 path: &'a String,
282 source: NodeSource,
283 inputs: &'a mut BTreeMap<DataId, Input>,
284 },
285 Runtime(&'a mut RuntimeNode),
287 Custom(&'a mut CustomNode),
288 Operator(&'a mut SingleOperatorDefinition),
289}