1use dora_message::{
2 config::{Input, InputMapping, NodeRunConfig},
3 descriptor::{GitRepoRev, NodeSource},
4 id::{DataId, NodeId, OperatorId},
5};
6use eyre::{Context, OptionExt, Result, bail};
7use std::{
8 collections::{BTreeMap, HashMap},
9 env::consts::EXE_EXTENSION,
10 path::{Path, PathBuf},
11 process::Stdio,
12};
13
14pub use dora_message::descriptor::{
16 CoreNodeKind, CustomNode, DYNAMIC_SOURCE, Descriptor, Node, OperatorConfig, OperatorDefinition,
17 OperatorSource, PythonSource, ResolvedNode, RuntimeNode, SHELL_SOURCE,
18 SingleOperatorDefinition,
19};
20pub use validate::ResolvedNodeExt;
21pub use visualize::collect_dora_timers;
22
23mod validate;
24mod visualize;
25
26pub trait DescriptorExt {
27 fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>>;
28 fn visualize_as_mermaid(&self) -> eyre::Result<String>;
29 fn blocking_read(path: &Path) -> eyre::Result<Descriptor>;
30 fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor>;
31 fn check(&self, working_dir: &Path) -> eyre::Result<()>;
32 fn check_in_daemon(&self, working_dir: &Path, coordinator_is_remote: bool) -> eyre::Result<()>;
33}
34
35pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op";
36
37impl DescriptorExt for Descriptor {
38 fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>> {
39 let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());
40
41 let single_operator_nodes: HashMap<_, _> = self
42 .nodes
43 .iter()
44 .filter_map(|n| {
45 n.operator
46 .as_ref()
47 .map(|op| (&n.id, op.id.as_ref().unwrap_or(&default_op_id)))
48 })
49 .collect();
50
51 let mut resolved = BTreeMap::new();
52 for mut node in self.nodes.clone() {
53 let mut node_kind = node_kind_mut(&mut node)?;
55 let input_mappings: Vec<_> = match &mut node_kind {
56 NodeKindMut::Standard { inputs, .. } => inputs.values_mut().collect(),
57 NodeKindMut::Runtime(node) => node
58 .operators
59 .iter_mut()
60 .flat_map(|op| op.config.inputs.values_mut())
61 .collect(),
62 NodeKindMut::Custom(node) => node.run_config.inputs.values_mut().collect(),
63 NodeKindMut::Operator(operator) => operator.config.inputs.values_mut().collect(),
64 };
65 for mapping in input_mappings
66 .into_iter()
67 .filter_map(|i| match &mut i.mapping {
68 InputMapping::Timer { .. } => None,
69 InputMapping::User(m) => Some(m),
70 })
71 {
72 if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() {
73 mapping.output = DataId::from(format!("{op_name}/{}", mapping.output));
74 }
75 }
76
77 let kind = match node_kind {
79 NodeKindMut::Standard {
80 path,
81 source,
82 inputs: _,
83 } => CoreNodeKind::Custom(CustomNode {
84 path: path.clone(),
85 source,
86 args: node.args,
87 build: node.build,
88 send_stdout_as: node.send_stdout_as,
89 run_config: NodeRunConfig {
90 inputs: node.inputs,
91 outputs: node.outputs,
92 },
93 envs: None,
94 restart_policy: node.restart_policy,
95 }),
96 NodeKindMut::Custom(node) => CoreNodeKind::Custom(node.clone()),
97 NodeKindMut::Runtime(node) => CoreNodeKind::Runtime(node.clone()),
98 NodeKindMut::Operator(op) => CoreNodeKind::Runtime(RuntimeNode {
99 operators: vec![OperatorDefinition {
100 id: op.id.clone().unwrap_or_else(|| default_op_id.clone()),
101 config: op.config.clone(),
102 }],
103 }),
104 };
105
106 let env = match (self.env.clone(), node.env) {
107 (None, node_env) => node_env,
108 (Some(mut self_env), node_env) => {
109 self_env.extend(node_env.unwrap_or_default());
110 Some(self_env)
111 }
112 };
113
114 resolved.insert(
115 node.id.clone(),
116 ResolvedNode {
117 id: node.id,
118 name: node.name,
119 description: node.description,
120 env,
121 deploy: node.deploy,
122 kind,
123 },
124 );
125 }
126
127 Ok(resolved)
128 }
129
130 fn visualize_as_mermaid(&self) -> eyre::Result<String> {
131 let resolved = self.resolve_aliases_and_set_defaults()?;
132 let flowchart = visualize::visualize_nodes(&resolved);
133
134 Ok(flowchart)
135 }
136
137 fn blocking_read(path: &Path) -> eyre::Result<Descriptor> {
138 let buf = std::fs::read(path).context("failed to open given file")?;
139 Descriptor::parse(buf)
140 }
141
142 fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor> {
143 serde_yaml::from_slice(&buf).context("failed to parse given descriptor")
144 }
145
146 fn check(&self, working_dir: &Path) -> eyre::Result<()> {
147 validate::check_dataflow(self, working_dir, None, false)
148 .wrap_err("Dataflow could not be validated.")
149 }
150
151 fn check_in_daemon(&self, working_dir: &Path, coordinator_is_remote: bool) -> eyre::Result<()> {
152 validate::check_dataflow(self, working_dir, None, coordinator_is_remote)
153 .wrap_err("Dataflow could not be validated.")
154 }
155}
156
157pub async fn read_as_descriptor(path: &Path) -> eyre::Result<Descriptor> {
158 let buf = tokio::fs::read(path)
159 .await
160 .context("failed to open given file")?;
161 Descriptor::parse(buf)
162}
163
164fn node_kind_mut(node: &mut Node) -> eyre::Result<NodeKindMut> {
165 match node.kind()? {
166 NodeKind::Standard(_) => {
167 let source = match (&node.git, &node.branch, &node.tag, &node.rev) {
168 (None, None, None, None) => NodeSource::Local,
169 (Some(repo), branch, tag, rev) => {
170 let rev = match (branch, tag, rev) {
171 (None, None, None) => None,
172 (Some(branch), None, None) => Some(GitRepoRev::Branch(branch.clone())),
173 (None, Some(tag), None) => Some(GitRepoRev::Tag(tag.clone())),
174 (None, None, Some(rev)) => Some(GitRepoRev::Rev(rev.clone())),
175 other @ (_, _, _) => {
176 eyre::bail!(
177 "only one of `branch`, `tag`, and `rev` are allowed (got {other:?})"
178 )
179 }
180 };
181 NodeSource::GitBranch {
182 repo: repo.clone(),
183 rev,
184 }
185 }
186 (None, _, _, _) => {
187 eyre::bail!("`git` source required when using branch, tag, or rev")
188 }
189 };
190
191 Ok(NodeKindMut::Standard {
192 path: node.path.as_ref().ok_or_eyre("missing `path` attribute")?,
193 source,
194 inputs: &mut node.inputs,
195 })
196 }
197 NodeKind::Runtime(_) => node
198 .operators
199 .as_mut()
200 .map(NodeKindMut::Runtime)
201 .ok_or_eyre("no operators"),
202 NodeKind::Custom(_) => node
203 .custom
204 .as_mut()
205 .map(NodeKindMut::Custom)
206 .ok_or_eyre("no custom"),
207 NodeKind::Operator(_) => node
208 .operator
209 .as_mut()
210 .map(NodeKindMut::Operator)
211 .ok_or_eyre("no operator"),
212 }
213}
214
215pub fn source_is_url(source: &str) -> bool {
216 source.contains("://")
217}
218
219pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
220 let path = Path::new(&source);
221 let path = if path.extension().is_none() {
222 path.with_extension(EXE_EXTENSION)
223 } else {
224 path.to_owned()
225 };
226
227 if let Ok(abs_path) = working_dir.join(&path).canonicalize() {
229 Ok(abs_path)
230 } else if which::which("uv").is_ok() {
232 let which = if cfg!(windows) { "where" } else { "which" };
234 let output = std::process::Command::new("uv")
235 .arg("run")
236 .arg(which)
237 .arg(&path)
238 .stdout(Stdio::null())
239 .stderr(Stdio::null())
240 .output()
241 .context("Could not run `uv run` to find binary")?;
242 if output.status.success() {
243 Ok(path)
244 } else if let Ok(abs_path) = which::which(&path) {
245 Ok(abs_path)
246 } else {
247 bail!("Could not find source path {}", path.display())
248 }
249 } else if let Ok(abs_path) = which::which(&path) {
250 Ok(abs_path)
251 } else {
252 bail!("Could not find source path {}", path.display())
253 }
254}
255
256pub trait NodeExt {
257 fn kind(&self) -> eyre::Result<NodeKind>;
258}
259
260impl NodeExt for Node {
261 fn kind(&self) -> eyre::Result<NodeKind> {
262 match (&self.path, &self.operators, &self.custom, &self.operator) {
263 (None, None, None, None) => {
264 eyre::bail!(
265 "node `{}` requires a `path`, `custom`, or `operators` field",
266 self.id
267 )
268 }
269 (None, None, None, Some(operator)) => Ok(NodeKind::Operator(operator)),
270 (None, None, Some(custom), None) => Ok(NodeKind::Custom(custom)),
271 (None, Some(runtime), None, None) => Ok(NodeKind::Runtime(runtime)),
272 (Some(path), None, None, None) => Ok(NodeKind::Standard(path)),
273 _ => {
274 eyre::bail!(
275 "node `{}` has multiple exclusive fields set, only one of `path`, `custom`, `operators` and `operator` is allowed",
276 self.id
277 )
278 }
279 }
280 }
281}
282
283#[derive(Debug)]
284pub enum NodeKind<'a> {
285 Standard(&'a String),
286 Runtime(&'a RuntimeNode),
288 Custom(&'a CustomNode),
289 Operator(&'a SingleOperatorDefinition),
290}
291
292#[derive(Debug)]
293enum NodeKindMut<'a> {
294 Standard {
295 path: &'a String,
296 source: NodeSource,
297 inputs: &'a mut BTreeMap<DataId, Input>,
298 },
299 Runtime(&'a mut RuntimeNode),
301 Custom(&'a mut CustomNode),
302 Operator(&'a mut SingleOperatorDefinition),
303}