use dora_message::{
config::{Input, InputMapping, NodeRunConfig},
descriptor::{GitRepoRev, NodeSource},
id::{DataId, NodeId, OperatorId},
};
use eyre::{Context, OptionExt, Result, bail};
use std::{
collections::{BTreeMap, HashMap},
env::consts::EXE_EXTENSION,
path::{Path, PathBuf},
process::Stdio,
};
pub use dora_message::descriptor::{
CoreNodeKind, CustomNode, DYNAMIC_SOURCE, Descriptor, Node, OperatorConfig, OperatorDefinition,
OperatorSource, PythonSource, ResolvedNode, RuntimeNode, SHELL_SOURCE,
SingleOperatorDefinition,
};
pub use validate::ResolvedNodeExt;
pub use visualize::collect_dora_timers;
mod validate;
mod visualize;
pub trait DescriptorExt {
fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>>;
fn visualize_as_mermaid(&self) -> eyre::Result<String>;
fn blocking_read(path: &Path) -> eyre::Result<Descriptor>;
fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor>;
fn check(&self, working_dir: &Path) -> eyre::Result<()>;
fn check_in_daemon(&self, working_dir: &Path, coordinator_is_remote: bool) -> eyre::Result<()>;
}
pub const SINGLE_OPERATOR_DEFAULT_ID: &str = "op";
impl DescriptorExt for Descriptor {
fn resolve_aliases_and_set_defaults(&self) -> eyre::Result<BTreeMap<NodeId, ResolvedNode>> {
let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());
let single_operator_nodes: HashMap<_, _> = self
.nodes
.iter()
.filter_map(|n| {
n.operator
.as_ref()
.map(|op| (&n.id, op.id.as_ref().unwrap_or(&default_op_id)))
})
.collect();
let mut resolved = BTreeMap::new();
for mut node in self.nodes.clone() {
let mut node_kind = node_kind_mut(&mut node)?;
let input_mappings: Vec<_> = match &mut node_kind {
NodeKindMut::Standard { inputs, .. } => inputs.values_mut().collect(),
NodeKindMut::Runtime(node) => node
.operators
.iter_mut()
.flat_map(|op| op.config.inputs.values_mut())
.collect(),
NodeKindMut::Custom(node) => node.run_config.inputs.values_mut().collect(),
NodeKindMut::Operator(operator) => operator.config.inputs.values_mut().collect(),
};
for mapping in input_mappings
.into_iter()
.filter_map(|i| match &mut i.mapping {
InputMapping::Timer { .. } => None,
InputMapping::User(m) => Some(m),
})
{
if let Some(op_name) = single_operator_nodes.get(&mapping.source).copied() {
mapping.output = DataId::from(format!("{op_name}/{}", mapping.output));
}
}
let kind = match node_kind {
NodeKindMut::Standard {
path,
source,
inputs: _,
} => CoreNodeKind::Custom(CustomNode {
path: path.clone(),
source,
args: node.args,
build: node.build,
send_stdout_as: node.send_stdout_as,
run_config: NodeRunConfig {
inputs: node.inputs,
outputs: node.outputs,
},
envs: None,
restart_policy: node.restart_policy,
}),
NodeKindMut::Custom(node) => CoreNodeKind::Custom(node.clone()),
NodeKindMut::Runtime(node) => CoreNodeKind::Runtime(node.clone()),
NodeKindMut::Operator(op) => CoreNodeKind::Runtime(RuntimeNode {
operators: vec![OperatorDefinition {
id: op.id.clone().unwrap_or_else(|| default_op_id.clone()),
config: op.config.clone(),
}],
}),
};
let env = match (self.env.clone(), node.env) {
(None, node_env) => node_env,
(Some(mut self_env), node_env) => {
self_env.extend(node_env.unwrap_or_default());
Some(self_env)
}
};
resolved.insert(
node.id.clone(),
ResolvedNode {
id: node.id,
name: node.name,
description: node.description,
env,
deploy: node.deploy,
kind,
},
);
}
Ok(resolved)
}
fn visualize_as_mermaid(&self) -> eyre::Result<String> {
let resolved = self.resolve_aliases_and_set_defaults()?;
let flowchart = visualize::visualize_nodes(&resolved);
Ok(flowchart)
}
fn blocking_read(path: &Path) -> eyre::Result<Descriptor> {
let buf = std::fs::read(path).context("failed to open given file")?;
Descriptor::parse(buf)
}
fn parse(buf: Vec<u8>) -> eyre::Result<Descriptor> {
serde_yaml::from_slice(&buf).context("failed to parse given descriptor")
}
fn check(&self, working_dir: &Path) -> eyre::Result<()> {
validate::check_dataflow(self, working_dir, None, false)
.wrap_err("Dataflow could not be validated.")
}
fn check_in_daemon(&self, working_dir: &Path, coordinator_is_remote: bool) -> eyre::Result<()> {
validate::check_dataflow(self, working_dir, None, coordinator_is_remote)
.wrap_err("Dataflow could not be validated.")
}
}
pub async fn read_as_descriptor(path: &Path) -> eyre::Result<Descriptor> {
let buf = tokio::fs::read(path)
.await
.context("failed to open given file")?;
Descriptor::parse(buf)
}
fn node_kind_mut(node: &mut Node) -> eyre::Result<NodeKindMut> {
match node.kind()? {
NodeKind::Standard(_) => {
let source = match (&node.git, &node.branch, &node.tag, &node.rev) {
(None, None, None, None) => NodeSource::Local,
(Some(repo), branch, tag, rev) => {
let rev = match (branch, tag, rev) {
(None, None, None) => None,
(Some(branch), None, None) => Some(GitRepoRev::Branch(branch.clone())),
(None, Some(tag), None) => Some(GitRepoRev::Tag(tag.clone())),
(None, None, Some(rev)) => Some(GitRepoRev::Rev(rev.clone())),
other @ (_, _, _) => {
eyre::bail!(
"only one of `branch`, `tag`, and `rev` are allowed (got {other:?})"
)
}
};
NodeSource::GitBranch {
repo: repo.clone(),
rev,
}
}
(None, _, _, _) => {
eyre::bail!("`git` source required when using branch, tag, or rev")
}
};
Ok(NodeKindMut::Standard {
path: node.path.as_ref().ok_or_eyre("missing `path` attribute")?,
source,
inputs: &mut node.inputs,
})
}
NodeKind::Runtime(_) => node
.operators
.as_mut()
.map(NodeKindMut::Runtime)
.ok_or_eyre("no operators"),
NodeKind::Custom(_) => node
.custom
.as_mut()
.map(NodeKindMut::Custom)
.ok_or_eyre("no custom"),
NodeKind::Operator(_) => node
.operator
.as_mut()
.map(NodeKindMut::Operator)
.ok_or_eyre("no operator"),
}
}
pub fn source_is_url(source: &str) -> bool {
source.contains("://")
}
pub fn resolve_path(source: &str, working_dir: &Path) -> Result<PathBuf> {
let path = Path::new(&source);
let path = if path.extension().is_none() {
path.with_extension(EXE_EXTENSION)
} else {
path.to_owned()
};
if let Ok(abs_path) = working_dir.join(&path).canonicalize() {
Ok(abs_path)
} else if which::which("uv").is_ok() {
let which = if cfg!(windows) { "where" } else { "which" };
let output = std::process::Command::new("uv")
.arg("run")
.arg(which)
.arg(&path)
.stdout(Stdio::null())
.stderr(Stdio::null())
.output()
.context("Could not run `uv run` to find binary")?;
if output.status.success() {
Ok(path)
} else if let Ok(abs_path) = which::which(&path) {
Ok(abs_path)
} else {
bail!("Could not find source path {}", path.display())
}
} else if let Ok(abs_path) = which::which(&path) {
Ok(abs_path)
} else {
bail!("Could not find source path {}", path.display())
}
}
pub trait NodeExt {
fn kind(&self) -> eyre::Result<NodeKind>;
}
impl NodeExt for Node {
fn kind(&self) -> eyre::Result<NodeKind> {
match (&self.path, &self.operators, &self.custom, &self.operator) {
(None, None, None, None) => {
eyre::bail!(
"node `{}` requires a `path`, `custom`, or `operators` field",
self.id
)
}
(None, None, None, Some(operator)) => Ok(NodeKind::Operator(operator)),
(None, None, Some(custom), None) => Ok(NodeKind::Custom(custom)),
(None, Some(runtime), None, None) => Ok(NodeKind::Runtime(runtime)),
(Some(path), None, None, None) => Ok(NodeKind::Standard(path)),
_ => {
eyre::bail!(
"node `{}` has multiple exclusive fields set, only one of `path`, `custom`, `operators` and `operator` is allowed",
self.id
)
}
}
}
}
#[derive(Debug)]
pub enum NodeKind<'a> {
Standard(&'a String),
Runtime(&'a RuntimeNode),
Custom(&'a CustomNode),
Operator(&'a SingleOperatorDefinition),
}
#[derive(Debug)]
enum NodeKindMut<'a> {
Standard {
path: &'a String,
source: NodeSource,
inputs: &'a mut BTreeMap<DataId, Input>,
},
Runtime(&'a mut RuntimeNode),
Custom(&'a mut CustomNode),
Operator(&'a mut SingleOperatorDefinition),
}