use dora_core::{
descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt},
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST},
};
use dora_message::{BuildId, cli_to_coordinator::CoordinatorControlClient, descriptor::NodeSource};
use eyre::Context;
use std::{collections::BTreeMap, net::IpAddr};
use super::{Executable, default_tracing};
use crate::{
common::{connect_and_check_version, local_working_dir, resolve_dataflow},
session::DataflowSession,
};
use distributed::{build_distributed_dataflow, wait_until_dataflow_built};
use local::build_dataflow_locally;
mod distributed;
mod git;
mod local;
#[derive(Debug, clap::Args)]
pub struct Build {
#[clap(value_name = "PATH")]
dataflow: String,
#[clap(long, value_name = "IP")]
coordinator_addr: Option<IpAddr>,
#[clap(long, value_name = "PORT")]
coordinator_port: Option<u16>,
#[clap(long, action)]
uv: bool,
#[clap(long, action)]
local: bool,
}
impl Executable for Build {
async fn execute(self) -> eyre::Result<()> {
default_tracing()?;
build_async(
self.dataflow,
self.coordinator_addr,
self.coordinator_port,
self.uv,
self.local,
)
.await
}
}
pub fn build(
dataflow: String,
coordinator_addr: Option<IpAddr>,
coordinator_port: Option<u16>,
uv: bool,
force_local: bool,
) -> eyre::Result<()> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(build_async(
dataflow,
coordinator_addr,
coordinator_port,
uv,
force_local,
))
}
pub async fn build_async(
dataflow: String,
coordinator_addr: Option<IpAddr>,
coordinator_port: Option<u16>,
uv: bool,
force_local: bool,
) -> eyre::Result<()> {
let dataflow_path = resolve_dataflow(dataflow)
.await
.context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow_path).wrap_err("Failed to read yaml dataflow")?;
let mut dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;
let mut git_sources = BTreeMap::new();
let resolved_nodes = dataflow_descriptor
.resolve_aliases_and_set_defaults()
.context("failed to resolve nodes")?;
for (node_id, node) in resolved_nodes {
if let CoreNodeKind::Custom(CustomNode {
source: NodeSource::GitBranch { repo, rev },
..
}) = node.kind
{
let source = git::fetch_commit_hash(repo, rev)
.with_context(|| format!("failed to find commit hash for `{node_id}`"))?;
git_sources.insert(node_id, source);
}
}
let session = || connect_to_coordinator_rpc_with_defaults(coordinator_addr, coordinator_port);
let build_kind = if force_local {
log::info!("Building locally, as requested through `--force-local`");
BuildKind::Local
} else if dataflow_descriptor.nodes.iter().all(|n| n.deploy.is_none()) {
log::info!("Building locally because dataflow does not contain any `deploy` sections");
BuildKind::Local
} else if coordinator_addr.is_some() || coordinator_port.is_some() {
log::info!("Building through coordinator, using the given coordinator socket information");
BuildKind::ThroughCoordinator {
coordinator_client: session()
.await
.context("failed to connect to coordinator")?,
}
} else {
match session().await {
Ok(coordinator_client) => {
log::info!("Found local dora coordinator instance -> building through coordinator");
BuildKind::ThroughCoordinator { coordinator_client }
}
Err(_) => {
log::warn!("No dora coordinator instance found -> trying a local build");
BuildKind::Local
}
}
};
match build_kind {
BuildKind::Local => {
log::info!("running local build");
let local_working_dir = dunce::canonicalize(&dataflow_path)
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
let build_info = build_dataflow_locally(
dataflow_descriptor,
&git_sources,
&dataflow_session,
local_working_dir,
uv,
)
.await?;
dataflow_session.git_sources = git_sources;
dataflow_session.build_id = Some(BuildId::generate());
dataflow_session.local_build = Some(build_info);
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;
}
BuildKind::ThroughCoordinator { coordinator_client } => {
let coord = coordinator_socket(coordinator_addr, coordinator_port);
let local_working_dir =
local_working_dir(&dataflow_path, &dataflow_descriptor, &coordinator_client)
.await?;
let build_id = build_distributed_dataflow(
&coordinator_client,
dataflow_descriptor,
&git_sources,
&dataflow_session,
local_working_dir,
uv,
)
.await?;
dataflow_session.git_sources = git_sources;
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;
wait_until_dataflow_built(
build_id,
&coordinator_client,
coordinator_socket(coordinator_addr, coordinator_port),
log::LevelFilter::Info,
)
.await?;
dataflow_session.build_id = Some(build_id);
dataflow_session.local_build = None;
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;
}
};
Ok(())
}
enum BuildKind {
Local,
ThroughCoordinator {
coordinator_client: CoordinatorControlClient,
},
}
async fn connect_to_coordinator_rpc_with_defaults(
coordinator_addr: Option<std::net::IpAddr>,
coordinator_port: Option<u16>,
) -> eyre::Result<CoordinatorControlClient> {
let addr = coordinator_addr.unwrap_or(LOCALHOST);
let control_port = coordinator_port.unwrap_or(DORA_COORDINATOR_PORT_CONTROL_DEFAULT);
connect_and_check_version(addr, control_port).await
}
fn coordinator_socket(
coordinator_addr: Option<std::net::IpAddr>,
coordinator_port: Option<u16>,
) -> std::net::SocketAddr {
let coordinator_addr = coordinator_addr.unwrap_or(LOCALHOST);
let coordinator_port = coordinator_port.unwrap_or(DORA_COORDINATOR_PORT_CONTROL_DEFAULT);
(coordinator_addr, coordinator_port).into()
}