use crate::{server::CoordinatorControlServer, tcp_utils::tcp_send};
pub use control::ControlEvent;
use dashmap::{
DashMap,
mapref::one::{Ref, RefMut},
};
use dora_core::{
config::{NodeId, OperatorId},
descriptor::DescriptorExt,
uhlc::{self, HLC},
};
use dora_message::{
BuildId, SessionId,
cli_to_coordinator::{
BuildRequest, CoordinatorControl, CoordinatorControlClient, CoordinatorControlRequest,
CoordinatorControlResponse,
},
common::DaemonId,
coordinator_to_cli::{DataflowResult, LogMessage, StopDataflowReply},
coordinator_to_daemon::{
BuildDataflowNodes, DaemonControlClient, DaemonControlRequest, DaemonControlResponse,
RegisterResult, Timestamped,
},
daemon_to_coordinator::DataflowDaemonResult,
descriptor::{Descriptor, ResolvedNode},
tarpc::{
self, ClientMessage, Response, Transport, client,
server::{BaseChannel, Channel},
tokio_serde,
},
};
use eyre::{ContextCompat, Result, WrapErr, bail, eyre};
use futures::{Future, Stream, StreamExt, future, stream::FuturesUnordered};
use futures_concurrency::stream::Merge;
use itertools::Itertools;
use log_subscriber::LogSubscriber;
use std::{
collections::{BTreeMap, BTreeSet},
net::SocketAddr,
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
task::JoinHandle,
};
use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;
mod control;
mod listener;
mod log_subscriber;
mod run;
mod server;
mod state;
mod tcp_utils;
pub async fn start(
bind: SocketAddr,
bind_control: SocketAddr,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let tasks = FuturesUnordered::new();
let control_events = control::control_events(bind_control, &tasks)
.await
.wrap_err("failed to create control events")?;
let (daemon_port, coordinator_state, future) =
init_coordinator(bind, external_events, control_events, tasks).await?;
let rpc_bind = SocketAddr::new(
bind_control.ip(),
dora_core::topics::dora_coordinator_port_rpc(bind_control.port()),
);
let listener =
tarpc::serde_transport::tcp::listen(rpc_bind, tokio_serde::formats::Json::default)
.await
.wrap_err("failed to start tarpc server for control messages")?;
let stream = listener
.filter_map(|c| future::ready(c.ok()))
.map(move |transport| {
let client_ip = transport.peer_addr().ok().map(|addr| addr.ip());
serve_control_requests(transport, coordinator_state.clone(), client_ip)
});
tokio::spawn(stream.for_each(|handle_connection| async {
tokio::spawn(handle_connection);
}));
Ok((daemon_port, future))
}
pub async fn start_with_channel_rpc(
bind: SocketAddr,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<
(
CoordinatorControlClient,
impl Future<Output = eyre::Result<()>>,
),
eyre::ErrReport,
> {
let tasks = FuturesUnordered::new();
let (_daemon_port, coordinator_state, future) =
init_coordinator(bind, external_events, futures::stream::empty(), tasks).await?;
let (client_transport, server_transport) = tarpc::transport::channel::unbounded();
tokio::spawn(serve_control_requests(
server_transport,
coordinator_state,
None,
));
let control_client =
CoordinatorControlClient::new(client::Config::default(), client_transport).spawn();
Ok((control_client, future))
}
async fn init_coordinator(
bind: SocketAddr,
external_events: impl Stream<Item = Event> + Unpin,
control_events: impl Stream<Item = Event> + Unpin,
mut tasks: FuturesUnordered<JoinHandle<()>>,
) -> Result<(
u16,
Arc<state::CoordinatorState>,
impl Future<Output = eyre::Result<()>>,
)> {
use tokio_stream::wrappers::TcpListenerStream;
let daemon_listener = listener::create_listener(bind).await?;
let daemon_port = daemon_listener
.local_addr()
.wrap_err("failed to get local addr of daemon listener")?
.port();
let new_daemon_connections = TcpListenerStream::new(daemon_listener).map(|c| {
c.map(Event::NewDaemonConnection)
.wrap_err("failed to open connection")
.unwrap_or_else(Event::DaemonConnectError)
});
let ctrlc_events = set_up_ctrlc_handler()?;
let events = (
external_events,
new_daemon_connections,
control_events,
ctrlc_events,
)
.merge();
let daemon_heartbeat_interval =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
.map(|_| Event::DaemonHeartbeatInterval);
let (abortable_events, abort_handle) =
futures::stream::abortable((events, daemon_heartbeat_interval).merge());
let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(100);
let coordinator_state = Arc::new(state::CoordinatorState {
clock: Arc::new(HLC::default()),
running_builds: Default::default(),
finished_builds: Default::default(),
running_dataflows: Default::default(),
dataflow_results: Default::default(),
archived_dataflows: Default::default(),
daemon_connections: Default::default(),
daemon_events_tx,
abort_handle,
});
let state_for_caller = coordinator_state.clone();
let future = async move {
start_inner(abortable_events, &tasks, daemon_events, coordinator_state).await?;
tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
while let Some(join_result) = tasks.next().await {
if let Err(err) = join_result {
tracing::error!("task panicked: {err}");
}
}
tracing::debug!("all spawned tasks finished, exiting..");
Ok(())
};
Ok((daemon_port, state_for_caller, future))
}
fn serve_control_requests<T>(
transport: T,
state: Arc<state::CoordinatorState>,
client_ip: Option<std::net::IpAddr>,
) -> impl Future<Output = ()>
where
T: Transport<Response<CoordinatorControlResponse>, ClientMessage<CoordinatorControlRequest>>
+ Send
+ 'static,
T::Error: std::error::Error + Send + Sync + 'static,
{
let channel = BaseChannel::with_defaults(transport);
let server = CoordinatorControlServer { state, client_ip };
channel.execute(server.serve()).for_each(|fut| async {
tokio::spawn(fut);
})
}
fn resolve_name(
name: String,
running_dataflows: &DashMap<Uuid, RunningDataflow>,
archived_dataflows: &DashMap<Uuid, ArchivedDataflow>,
) -> eyre::Result<Uuid> {
let uuids: Vec<_> = running_dataflows
.iter()
.filter(|r| r.value().name.as_deref() == Some(name.as_str()))
.map(|r| *r.key())
.collect();
let archived_uuids: Vec<_> = archived_dataflows
.iter()
.filter(|r| r.value().name.as_deref() == Some(name.as_str()))
.map(|r| *r.key())
.collect();
if uuids.is_empty() {
if archived_uuids.is_empty() {
bail!("no dataflow with name `{name}`");
} else if let [uuid] = archived_uuids.as_slice() {
Ok(*uuid)
} else {
bail!(
"multiple archived dataflows found with name `{name}`, Please provide the UUID instead."
);
}
} else if let [uuid] = uuids.as_slice() {
Ok(*uuid)
} else {
bail!("multiple dataflows found with name `{name}`");
}
}
#[derive(Default)]
pub(crate) struct DaemonConnections {
daemons: DashMap<DaemonId, DaemonConnection>,
}
impl DaemonConnections {
fn add(&self, daemon_id: DaemonId, connection: DaemonConnection) {
let previous = self.daemons.insert(daemon_id.clone(), connection);
if previous.is_some() {
tracing::info!("closing previous connection `{daemon_id}` on new register");
}
}
fn get(&self, id: &DaemonId) -> Option<Ref<'_, DaemonId, DaemonConnection>> {
self.daemons.get(id)
}
pub(crate) fn get_mut(&self, id: &DaemonId) -> Option<RefMut<'_, DaemonId, DaemonConnection>> {
self.daemons.get_mut(id)
}
fn get_matching_daemon_id(&self, machine_id: &str) -> Option<DaemonId> {
self.daemons
.iter()
.find(|r| r.key().matches_machine_id(machine_id))
.map(|r| r.key().clone())
}
fn clear(&self) {
self.daemons.clear();
}
fn is_empty(&self) -> bool {
self.daemons.is_empty()
}
fn keys(&self) -> impl Iterator<Item = DaemonId> {
self.daemons.iter().map(|r| r.key().clone())
}
fn iter(
&self,
) -> impl Iterator<Item = dashmap::mapref::multiple::RefMulti<'_, DaemonId, DaemonConnection>>
{
self.daemons.iter()
}
pub(crate) fn remove(&self, daemon_id: &DaemonId) -> Option<DaemonConnection> {
self.daemons
.remove(daemon_id)
.map(|(_, connection)| connection)
}
fn unnamed(&self) -> impl Iterator<Item = DaemonId> {
self.daemons
.iter()
.filter(|r| r.key().machine_id().is_none())
.map(|r| r.key().clone())
}
}
async fn start_inner(
events: impl Stream<Item = Event> + Unpin,
tasks: &FuturesUnordered<JoinHandle<()>>,
daemon_events: tokio::sync::mpsc::Receiver<Event>,
coordinator_state: Arc<state::CoordinatorState>,
) -> eyre::Result<()> {
let clock = coordinator_state.clock.clone();
let daemon_events = ReceiverStream::new(daemon_events);
let mut events = (events, daemon_events).merge();
while let Some(event) = events.next().await {
let start = Instant::now();
let event_kind = event.kind();
if event.log() {
tracing::trace!("Handling event {event:?}");
}
match event {
Event::Close => {
tracing::info!("Received Close event, shutting down coordinator");
break;
}
Event::NewDaemonConnection(connection) => {
connection.set_nodelay(true)?;
let events_tx = coordinator_state.daemon_events_tx.clone();
if !events_tx.is_closed() {
let task = tokio::spawn(listener::handle_connection(
connection,
events_tx,
clock.clone(),
));
tasks.push(task);
} else {
tracing::warn!(
"ignoring new daemon connection because events_tx was closed already"
);
}
}
Event::DaemonConnectError(err) => {
tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
}
Event::Daemon(event) => match event {
DaemonRequest::Register {
machine_id,
machine_uid,
mut connection,
version_check_result,
} => {
let existing = match &machine_id {
Some(id) => coordinator_state
.daemon_connections
.get_matching_daemon_id(id),
None => coordinator_state.daemon_connections.unnamed().next(),
};
let existing_result = if existing.is_some() {
Err(format!(
"There is already a connected daemon with machine ID `{machine_id:?}`"
))
} else {
Ok(())
};
let daemon_id = DaemonId::new(machine_id);
let reply: Timestamped<RegisterResult> = Timestamped {
inner: match version_check_result.as_ref().and(existing_result.as_ref()) {
Ok(_) => RegisterResult::Ok {
daemon_id: daemon_id.clone(),
},
Err(err) => RegisterResult::Err(err.clone()),
},
timestamp: clock.new_timestamp(),
};
let send_result = tcp_send(&mut connection, &serde_json::to_vec(&reply)?)
.await
.context("tcp send failed");
match version_check_result
.map_err(|e| eyre!(e))
.and(existing_result.map_err(|e| eyre!(e)))
.and(send_result)
{
Ok(()) => {
let peer_addr = connection.peer_addr().ok();
let codec = tokio_serde::formats::Json::<
Response<DaemonControlResponse>,
ClientMessage<DaemonControlRequest>,
>::default();
let transport =
tarpc::serde_transport::Transport::from((connection, codec));
let daemon_client =
DaemonControlClient::new(client::Config::default(), transport)
.spawn();
coordinator_state.daemon_connections.add(
daemon_id.clone(),
DaemonConnection {
client: daemon_client,
last_heartbeat: Instant::now(),
peer_addr,
machine_uid,
},
);
}
Err(err) => {
tracing::warn!(
"failed to register daemon connection for daemon `{daemon_id}`: {err}"
);
}
}
}
DaemonRequest::RegisterNotificationChannel {
daemon_id,
connection,
} => {
use dora_message::daemon_to_coordinator::{
CoordinatorNotify, CoordinatorNotifyRequest, CoordinatorNotifyResponse,
};
use tarpc::server::{BaseChannel, Channel};
let codec = tokio_serde::formats::Json::<
ClientMessage<CoordinatorNotifyRequest>,
Response<CoordinatorNotifyResponse>,
>::default();
let transport = tarpc::serde_transport::Transport::from((connection, codec));
let server = listener::CoordinatorNotifyServer {
daemon_id: daemon_id.clone(),
coordinator_state: coordinator_state.clone(),
};
let channel = BaseChannel::with_defaults(transport);
tokio::spawn(channel.execute(server.serve()).for_each(|fut| async {
tokio::spawn(fut);
}));
tracing::info!(
"reverse-channel RPC server established for daemon `{daemon_id}`"
);
}
},
Event::Control(event) => match event {
ControlEvent::Error(err) => tracing::error!("{err:?}"),
ControlEvent::LogSubscribe {
dataflow_id,
level,
connection,
} => {
if let Some(mut dataflow) =
coordinator_state.running_dataflows.get_mut(&dataflow_id)
{
dataflow
.log_subscribers
.push(LogSubscriber::new(level, connection));
let buffered = std::mem::take(&mut dataflow.buffered_log_messages);
for message in buffered {
send_log_message(&mut dataflow.log_subscribers, &message).await;
}
}
}
ControlEvent::BuildLogSubscribe {
build_id,
level,
connection,
} => {
if let Some(mut build) = coordinator_state.running_builds.get_mut(&build_id) {
build
.log_subscribers
.push(LogSubscriber::new(level, connection));
let buffered = std::mem::take(&mut build.buffered_log_messages);
for message in buffered {
send_log_message(&mut build.log_subscribers, &message).await;
}
}
}
},
Event::DaemonHeartbeatInterval => {
let daemons_to_check: Vec<(DaemonId, Duration, DaemonControlClient)> =
coordinator_state
.daemon_connections
.iter()
.map(|r| {
(
r.key().clone(),
r.value().last_heartbeat.elapsed(),
r.value().client.clone(),
)
})
.collect();
let mut disconnected = BTreeSet::new();
for (machine_id, elapsed, client) in daemons_to_check {
if elapsed > Duration::from_secs(15) {
tracing::warn!(
"no heartbeat message from machine `{machine_id}` since {elapsed:?}",
)
}
if elapsed > Duration::from_secs(30) {
disconnected.insert(machine_id);
continue;
}
tokio::spawn(async move {
if let Err(err) = client.heartbeat(tarpc::context::current()).await {
tracing::warn!(
"failed to send heartbeat to daemon `{machine_id}`: {err}"
);
}
});
}
if !disconnected.is_empty() {
tracing::error!("Disconnecting daemons that failed watchdog: {disconnected:?}");
for machine_id in disconnected {
coordinator_state.daemon_connections.remove(&machine_id);
}
}
}
Event::CtrlC => {
tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
handle_destroy(&coordinator_state).await?;
}
Event::Log(message) => {
if let Some(dataflow_id) = &message.dataflow_id {
if let Some(mut dataflow) =
coordinator_state.running_dataflows.get_mut(dataflow_id)
{
if dataflow.log_subscribers.is_empty() {
dataflow.buffered_log_messages.push(message);
} else {
send_log_message(&mut dataflow.log_subscribers, &message).await;
}
}
} else if let Some(build_id) = &message.build_id {
if let Some(mut build) = coordinator_state.running_builds.get_mut(build_id) {
if build.log_subscribers.is_empty() {
build.buffered_log_messages.push(message);
} else {
send_log_message(&mut build.log_subscribers, &message).await;
}
}
}
}
}
let elapsed = start.elapsed();
if elapsed > Duration::from_millis(100) {
tracing::warn!(
"Coordinator took {}ms for handling event: {event_kind}",
elapsed.as_millis()
);
}
}
tracing::info!("stopped");
Ok(())
}
pub(crate) async fn send_log_message(
log_subscribers: &mut Vec<LogSubscriber>,
message: &LogMessage,
) {
for subscriber in log_subscribers.iter_mut() {
let send_result =
tokio::time::timeout(Duration::from_millis(100), subscriber.send_message(message));
if send_result.await.is_err() {
subscriber.close();
}
}
log_subscribers.retain(|s| !s.is_closed());
}
pub(crate) fn dataflow_result(
results: &BTreeMap<DaemonId, DataflowDaemonResult>,
dataflow_uuid: Uuid,
clock: &uhlc::HLC,
) -> DataflowResult {
let mut node_results = BTreeMap::new();
for result in results.values() {
node_results.extend(result.node_results.clone());
if let Err(err) = clock.update_with_timestamp(&result.timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
}
DataflowResult {
uuid: dataflow_uuid,
timestamp: clock.new_timestamp(),
node_results,
}
}
pub(crate) struct DaemonConnection {
client: DaemonControlClient,
pub(crate) last_heartbeat: Instant,
peer_addr: Option<SocketAddr>,
machine_uid: Option<String>,
}
async fn handle_destroy(
coordinator_state: &state::CoordinatorState,
) -> Result<(), eyre::ErrReport> {
coordinator_state.abort_handle.abort();
for dataflow_uuid in coordinator_state
.running_dataflows
.iter()
.map(|entry| *entry.key())
.collect::<Vec<_>>()
{
let _ = stop_dataflow(
&coordinator_state.running_dataflows,
dataflow_uuid,
&coordinator_state.daemon_connections,
None,
false,
)
.await?;
}
let result = destroy_daemons(&coordinator_state.daemon_connections).await;
let _ = coordinator_state.daemon_events_tx.send(Event::Close).await;
result
}
#[derive(Debug, Clone)]
pub struct BuildFinishedResult {
pub build_id: BuildId,
pub result: Result<(), String>,
}
pub(crate) struct RunningBuild {
pub(crate) errors: Vec<String>,
pub(crate) build_result: CachedResult<BuildFinishedResult>,
pub(crate) buffered_log_messages: Vec<LogMessage>,
pub(crate) log_subscribers: Vec<LogSubscriber>,
pub(crate) pending_build_results: BTreeSet<DaemonId>,
}
pub(crate) struct RunningDataflow {
name: Option<String>,
pub(crate) uuid: Uuid,
descriptor: Descriptor,
pub(crate) daemons: BTreeSet<DaemonId>,
pub(crate) pending_daemons: BTreeSet<DaemonId>,
pub(crate) exited_before_subscribe: Vec<NodeId>,
nodes: BTreeMap<NodeId, ResolvedNode>,
node_to_daemon: BTreeMap<NodeId, DaemonId>,
node_metrics: BTreeMap<NodeId, dora_message::daemon_to_coordinator::NodeMetrics>,
pub(crate) spawn_result: CachedResult<Uuid>,
pub(crate) stop_reply_senders:
Vec<tokio::sync::oneshot::Sender<eyre::Result<StopDataflowReply>>>,
pub(crate) buffered_log_messages: Vec<LogMessage>,
pub(crate) log_subscribers: Vec<LogSubscriber>,
pub(crate) pending_spawn_results: BTreeSet<DaemonId>,
}
pub enum CachedResult<T> {
Pending {
result_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<T>>>,
},
Cached {
result: eyre::Result<T>,
},
}
impl<T> Default for CachedResult<T> {
fn default() -> Self {
Self::Pending {
result_senders: Vec::new(),
}
}
}
impl<T: Clone> CachedResult<T> {
fn register(&mut self, reply_sender: tokio::sync::oneshot::Sender<eyre::Result<T>>) {
match self {
CachedResult::Pending { result_senders } => result_senders.push(reply_sender),
CachedResult::Cached { result } => {
Self::send_result_to(result, reply_sender);
}
}
}
fn set_result(&mut self, result: eyre::Result<T>) {
match self {
CachedResult::Pending { result_senders } => {
for sender in result_senders.drain(..) {
Self::send_result_to(&result, sender);
}
*self = CachedResult::Cached { result };
}
CachedResult::Cached { .. } => {}
}
}
fn send_result_to(result: &eyre::Result<T>, sender: oneshot::Sender<eyre::Result<T>>) {
let result = match result {
Ok(r) => Ok(r.clone()),
Err(err) => Err(eyre!("{err:?}")),
};
let _ = sender.send(result);
}
}
pub(crate) struct ArchivedDataflow {
name: Option<String>,
nodes: BTreeMap<NodeId, ResolvedNode>,
}
impl From<&RunningDataflow> for ArchivedDataflow {
fn from(dataflow: &RunningDataflow) -> ArchivedDataflow {
ArchivedDataflow {
name: dataflow.name.clone(),
nodes: dataflow.nodes.clone(),
}
}
}
impl PartialEq for RunningDataflow {
fn eq(&self, other: &Self) -> bool {
self.name == other.name && self.uuid == other.uuid && self.daemons == other.daemons
}
}
impl Eq for RunningDataflow {}
async fn stop_dataflow<'a>(
running_dataflows: &'a DashMap<Uuid, RunningDataflow>,
dataflow_uuid: Uuid,
daemon_connections: &DaemonConnections,
grace_duration: Option<Duration>,
force: bool,
) -> eyre::Result<RefMut<'a, Uuid, RunningDataflow>> {
let daemon_ids: Vec<DaemonId> = {
let Some(dataflow) = running_dataflows.get(&dataflow_uuid) else {
bail!("no known running dataflow found with UUID `{dataflow_uuid}`")
};
dataflow.daemons.iter().cloned().collect()
};
for daemon_id in &daemon_ids {
let client = daemon_connections
.get(daemon_id)
.wrap_err("no daemon connection")?
.client
.clone();
client
.stop_dataflow(
tarpc::context::current(),
dataflow_uuid,
grace_duration,
force,
)
.await
.context("RPC transport error")?
.map_err(|e: String| eyre!(e))
.wrap_err("failed to stop dataflow")?;
}
tracing::info!("successfully send stop dataflow `{dataflow_uuid}` to all daemons");
running_dataflows
.get_mut(&dataflow_uuid)
.wrap_err("dataflow was removed while sending stop commands")
}
async fn reload_dataflow(
running_dataflows: &DashMap<Uuid, RunningDataflow>,
dataflow_id: Uuid,
node_id: NodeId,
operator_id: Option<OperatorId>,
daemon_connections: &DaemonConnections,
) -> eyre::Result<()> {
let daemon_ids: Vec<DaemonId> = {
let Some(dataflow) = running_dataflows.get(&dataflow_id) else {
bail!("No running dataflow found with UUID `{dataflow_id}`")
};
dataflow.daemons.iter().cloned().collect()
};
for machine_id in &daemon_ids {
let client = daemon_connections
.get(machine_id)
.wrap_err("no daemon connection")?
.client
.clone();
client
.reload_dataflow(
tarpc::context::current(),
dataflow_id,
node_id.clone(),
operator_id.clone(),
)
.await
.context("RPC transport error")?
.map_err(|e: String| eyre!(e))
.wrap_err("failed to reload dataflow")?;
}
tracing::info!("successfully reloaded dataflow `{dataflow_id}`");
Ok(())
}
async fn retrieve_logs(
running_dataflows: &DashMap<Uuid, RunningDataflow>,
archived_dataflows: &DashMap<Uuid, ArchivedDataflow>,
dataflow_id: Uuid,
node_id: NodeId,
daemon_connections: &DaemonConnections,
tail: Option<usize>,
) -> eyre::Result<Vec<u8>> {
let nodes = if let Some(dataflow) = archived_dataflows.get(&dataflow_id) {
dataflow.nodes.clone()
} else if let Some(dataflow) = running_dataflows.get(&dataflow_id) {
dataflow.nodes.clone()
} else {
bail!("No dataflow found with UUID `{dataflow_id}`")
};
let machine_ids: Vec<Option<String>> = nodes
.values()
.filter(|node| node.id == node_id)
.map(|node| node.deploy.as_ref().and_then(|d| d.machine.clone()))
.collect();
let machine_id = if let [machine_id] = &machine_ids[..] {
machine_id
} else if machine_ids.is_empty() {
bail!("No machine contains {}/{}", dataflow_id, node_id)
} else {
bail!(
"More than one machine contains {}/{}. However, it should only be present on one.",
dataflow_id,
node_id
)
};
let daemon_ids: Vec<_> = match machine_id {
None => daemon_connections.unnamed().collect(),
Some(machine_id) => daemon_connections
.get_matching_daemon_id(machine_id)
.into_iter()
.collect(),
};
let daemon_id = match &daemon_ids[..] {
[id] => (*id).clone(),
[] => eyre::bail!("no matching daemon connections for machine ID `{machine_id:?}`"),
_ => eyre::bail!("multiple matching daemon connections for machine ID `{machine_id:?}`"),
};
let client = daemon_connections
.get(&daemon_id)
.wrap_err_with(|| format!("no daemon connection to `{daemon_id}`"))?
.client
.clone();
let reply_logs = client
.logs(
tarpc::context::current(),
dataflow_id,
node_id.clone(),
tail,
)
.await
.context("RPC transport error")?;
tracing::info!("successfully retrieved logs for `{dataflow_id}/{node_id}`");
reply_logs.map_err(|err: String| eyre!(err))
}
#[tracing::instrument(skip(daemon_connections))]
async fn build_dataflow(
build_request: BuildRequest,
build_id: BuildId,
daemon_connections: &DaemonConnections,
) -> eyre::Result<RunningBuild> {
let BuildRequest {
session_id,
dataflow,
git_sources,
prev_git_sources,
local_working_dir,
uv,
} = build_request;
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let mut git_sources_by_daemon = git_sources
.into_iter()
.into_grouping_map_by(|(id, _)| {
nodes
.get(id)
.and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
})
.collect();
let mut prev_git_sources_by_daemon = prev_git_sources
.into_iter()
.into_grouping_map_by(|(id, _)| {
nodes
.get(id)
.and_then(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()))
})
.collect();
let nodes_by_daemon = nodes
.values()
.into_group_map_by(|n| n.deploy.as_ref().and_then(|d| d.machine.as_ref()));
let mut daemons = BTreeSet::new();
for (machine, nodes_on_machine) in &nodes_by_daemon {
let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
tracing::debug!(
"Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
);
let build_command = BuildDataflowNodes {
build_id,
session_id,
local_working_dir: local_working_dir.clone(),
git_sources: git_sources_by_daemon.remove(machine).unwrap_or_default(),
prev_git_sources: prev_git_sources_by_daemon
.remove(machine)
.unwrap_or_default(),
dataflow_descriptor: dataflow.clone(),
nodes_on_machine,
uv,
};
let daemon_id = build_dataflow_on_machine(
daemon_connections,
machine.map(|s| s.as_str()),
build_command,
)
.await
.wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
daemons.insert(daemon_id);
}
tracing::info!("successfully triggered dataflow build `{build_id}`",);
Ok(RunningBuild {
errors: Vec::new(),
build_result: CachedResult::default(),
buffered_log_messages: Vec::new(),
log_subscribers: Vec::new(),
pending_build_results: daemons,
})
}
async fn build_dataflow_on_machine(
daemon_connections: &DaemonConnections,
machine: Option<&str>,
build_command: BuildDataflowNodes,
) -> Result<DaemonId, eyre::ErrReport> {
let daemon_id = match machine {
Some(machine) => daemon_connections
.get_matching_daemon_id(machine)
.wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
.clone(),
None => daemon_connections
.unnamed()
.next()
.wrap_err("no unnamed daemon connections")?
.clone(),
};
let client = daemon_connections
.get(&daemon_id)
.wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?
.client
.clone();
client
.build(tarpc::context::current(), build_command)
.await
.context("RPC transport error")?
.map_err(|e: String| eyre!(e))
.wrap_err("daemon returned an error")?;
Ok(daemon_id)
}
#[allow(clippy::too_many_arguments)]
async fn start_dataflow(
build_id: Option<BuildId>,
session_id: SessionId,
dataflow: Descriptor,
local_working_dir: Option<PathBuf>,
name: Option<String>,
daemon_connections: &DaemonConnections,
running_dataflows: &DashMap<Uuid, RunningDataflow>,
uv: bool,
write_events_to: Option<PathBuf>,
) -> eyre::Result<Uuid> {
let plan = run::plan_dataflow(
build_id,
session_id,
&dataflow,
local_working_dir,
daemon_connections,
uv,
write_events_to,
)?;
let uuid = plan.uuid;
let daemons = plan.daemons.clone();
let run::DataflowPlan {
uuid: _,
daemons: _,
nodes,
node_to_daemon,
daemon_spawn_commands,
} = plan;
running_dataflows.insert(
uuid,
RunningDataflow {
uuid,
name,
descriptor: dataflow,
pending_daemons: if daemons.len() > 1 {
daemons.clone()
} else {
BTreeSet::new()
},
exited_before_subscribe: Default::default(),
daemons: daemons.clone(),
nodes,
node_to_daemon,
node_metrics: BTreeMap::new(),
spawn_result: CachedResult::default(),
stop_reply_senders: Vec::new(),
buffered_log_messages: Vec::new(),
log_subscribers: Vec::new(),
pending_spawn_results: daemons,
},
);
if let Err(err) =
run::execute_dataflow_plan(uuid, daemon_spawn_commands, daemon_connections).await
{
running_dataflows.remove(&uuid);
return Err(err);
}
Ok(uuid)
}
async fn destroy_daemon(daemon_id: DaemonId, client: DaemonControlClient) -> Result<()> {
client
.destroy(tarpc::context::current())
.await
.wrap_err(format!(
"failed to send destroy message to daemon `{daemon_id}`"
))?
.map_err(|e: String| eyre!(e))
.wrap_err("failed to destroy daemon")?;
tracing::info!("successfully destroyed daemon `{daemon_id}`");
Ok(())
}
async fn destroy_daemons(daemon_connections: &DaemonConnections) -> eyre::Result<()> {
let daemons: Vec<(DaemonId, DaemonControlClient)> = daemon_connections
.iter()
.map(|r| (r.key().clone(), r.value().client.clone()))
.collect();
let results = futures::future::join_all(daemons.into_iter().map(|(daemon_id, client)| {
tracing::info!("Destroying daemon connection for `{daemon_id}`");
destroy_daemon(daemon_id, client)
}))
.await;
daemon_connections.clear();
for result in results {
result?;
}
Ok(())
}
#[derive(Debug)]
pub enum Event {
NewDaemonConnection(TcpStream),
DaemonConnectError(eyre::Report),
Control(ControlEvent),
Daemon(DaemonRequest),
DaemonHeartbeatInterval,
CtrlC,
Log(LogMessage),
Close,
}
impl Event {
#[allow(clippy::match_like_matches_macro)]
pub fn log(&self) -> bool {
match self {
Event::DaemonHeartbeatInterval => false,
_ => true,
}
}
fn kind(&self) -> &'static str {
match self {
Event::NewDaemonConnection(_) => "NewDaemonConnection",
Event::DaemonConnectError(_) => "DaemonConnectError",
Event::Control(_) => "Control",
Event::Daemon(_) => "Daemon",
Event::DaemonHeartbeatInterval => "DaemonHeartbeatInterval",
Event::CtrlC => "CtrlC",
Event::Log(_) => "Log",
Event::Close => "Close",
}
}
}
#[derive(Debug)]
pub enum DaemonRequest {
Register {
machine_id: Option<String>,
machine_uid: Option<String>,
connection: TcpStream,
version_check_result: Result<(), String>,
},
RegisterNotificationChannel {
daemon_id: DaemonId,
connection: TcpStream,
},
}
fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
tracing::error!("failed to report ctrl-c event to dora-coordinator");
}
ctrlc_sent = true;
}
})
.wrap_err("failed to set ctrl-c handler")?;
Ok(ReceiverStream::new(ctrlc_rx))
}