use crate::{
run::spawn_dataflow,
tcp_utils::{tcp_receive, tcp_send},
};
use control::ControlEvent;
use dora_core::{
config::CommunicationConfig,
coordinator_messages::RegisterResult,
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply},
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use futures_concurrency::stream::Merge;
use run::SpawnedDataflow;
use std::{
collections::{BTreeSet, HashMap},
path::Path,
time::Duration,
};
use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use uuid::Uuid;
mod control;
mod listener;
mod run;
mod tcp_utils;
pub async fn run() -> eyre::Result<()> {
let mut tasks = FuturesUnordered::new();
start(&tasks).await?;
tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
while let Some(join_result) = tasks.next().await {
if let Err(err) = join_result {
tracing::error!("task panicked: {err}");
}
}
tracing::debug!("all spawned tasks finished, exiting..");
Ok(())
}
async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
let ctrlc_events = set_up_ctrlc_handler()?;
let listener = listener::create_listener(DORA_COORDINATOR_PORT_DEFAULT).await?;
let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
c.map(Event::NewDaemonConnection)
.wrap_err("failed to open connection")
.unwrap_or_else(Event::DaemonConnectError)
});
let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2);
let mut daemon_events_tx = Some(daemon_events_tx);
let daemon_events = ReceiverStream::new(daemon_events);
let control_events = control::control_events(control_socket_addr(), tasks)
.await
.wrap_err("failed to create control events")?;
let daemon_watchdog_interval =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(1)))
.map(|_| Event::DaemonWatchdogInterval);
let (abortable_events, abort_handle) = futures::stream::abortable(
(
control_events,
new_daemon_connections,
ctrlc_events,
daemon_watchdog_interval,
)
.merge(),
);
let mut events = (abortable_events, daemon_events).merge();
let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
let mut daemon_connections: HashMap<_, TcpStream> = HashMap::new();
while let Some(event) = events.next().await {
if event.log() {
tracing::trace!("Handling event {event:?}");
}
match event {
Event::NewDaemonConnection(connection) => {
connection.set_nodelay(true)?;
let events_tx = daemon_events_tx.clone();
if let Some(events_tx) = events_tx {
let task = tokio::spawn(listener::handle_connection(connection, events_tx));
tasks.push(task);
} else {
tracing::warn!(
"ignoring new daemon connection because events_tx was closed already"
);
}
}
Event::DaemonConnectError(err) => {
tracing::warn!("{:?}", err.wrap_err("failed to connect to dora-daemon"));
}
Event::Daemon(event) => {
match event {
DaemonEvent::Register {
machine_id,
mut connection,
} => {
let reply = RegisterResult::Ok;
match tcp_send(&mut connection, &serde_json::to_vec(&reply)?).await {
Ok(()) => {
let previous =
daemon_connections.insert(machine_id.clone(), connection);
if let Some(_previous) = previous {
tracing::info!("closing previous connection `{machine_id}` on new register");
}
}
Err(err) => {
tracing::warn!("failed to register daemon connection for machine `{machine_id}`: {err}");
}
}
}
}
}
Event::Dataflow { uuid, event } => match event {
DataflowEvent::DataflowFinishedOnMachine { machine_id, result } => {
match running_dataflows.entry(uuid) {
std::collections::hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().machines.remove(&machine_id);
match result {
Ok(()) => {
tracing::info!("dataflow `{uuid}` finished successfully on machine `{machine_id}`");
}
Err(err) => {
let err =
err.wrap_err(format!("error occured in dataflow `{uuid}` on machine `{machine_id}`"));
tracing::error!("{err:?}");
}
}
if entry.get_mut().machines.is_empty() {
entry.remove();
tracing::info!("dataflow `{uuid}` finished");
}
}
std::collections::hash_map::Entry::Vacant(_) => {
tracing::warn!("dataflow not running on DataflowFinishedOnMachine");
}
}
}
},
Event::Control(event) => match event {
ControlEvent::IncomingRequest {
request,
reply_sender,
} => {
let reply = match request {
ControlRequest::Start {
dataflow_path,
name,
} => {
let inner = async {
if let Some(name) = name.as_deref() {
if running_dataflows
.values()
.any(|d: &RunningDataflow| d.name.as_deref() == Some(name))
{
bail!("there is already a running dataflow with name `{name}`");
}
}
let dataflow =
start_dataflow(&dataflow_path, name, &mut daemon_connections)
.await?;
Ok(dataflow)
};
inner.await.map(|dataflow| {
let uuid = dataflow.uuid;
running_dataflows.insert(uuid, dataflow);
ControlRequestReply::DataflowStarted { uuid }
})
}
ControlRequest::Stop { dataflow_uuid } => {
let stop = async {
stop_dataflow(
&running_dataflows,
dataflow_uuid,
&mut daemon_connections,
)
.await?;
Result::<_, eyre::Report>::Ok(())
};
stop.await.map(|()| ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
})
}
ControlRequest::StopByName { name } => {
let stop = async {
let uuids: Vec<_> = running_dataflows
.iter()
.filter(|(_, v)| v.name.as_deref() == Some(name.as_str()))
.map(|(k, _)| k)
.copied()
.collect();
let dataflow_uuid = if uuids.is_empty() {
bail!("no running dataflow with name `{name}`");
} else if let [uuid] = uuids.as_slice() {
*uuid
} else {
bail!("multiple dataflows found with name `{name}`");
};
stop_dataflow(
&running_dataflows,
dataflow_uuid,
&mut daemon_connections,
)
.await?;
Result::<_, eyre::Report>::Ok(dataflow_uuid)
};
stop.await
.map(|uuid| ControlRequestReply::DataflowStopped { uuid })
}
ControlRequest::Destroy => {
tracing::info!("Received destroy command");
handle_destroy(
&running_dataflows,
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
)
.await
.map(|()| ControlRequestReply::DestroyOk)
}
ControlRequest::List => {
let mut dataflows: Vec<_> = running_dataflows.values().collect();
dataflows.sort_by_key(|d| (&d.name, d.uuid));
Ok(ControlRequestReply::DataflowList {
dataflows: dataflows
.into_iter()
.map(|d| DataflowId {
uuid: d.uuid,
name: d.name.clone(),
})
.collect(),
})
}
ControlRequest::DaemonConnected => {
let running = !daemon_connections.is_empty();
Ok(ControlRequestReply::DaemonConnected(running))
}
};
let _ = reply_sender.send(reply);
}
ControlEvent::Error(err) => tracing::error!("{err:?}"),
},
Event::DaemonWatchdogInterval => {
let mut disconnected = BTreeSet::new();
for (machine_id, connection) in &mut daemon_connections {
let result: eyre::Result<()> =
tokio::time::timeout(Duration::from_millis(100), send_watchdog_message(connection))
.await
.wrap_err("timeout")
.and_then(|r| r).wrap_err_with(||
format!("daemon at `{machine_id}` did not react as expected to watchdog message"),
);
if let Err(err) = result {
tracing::warn!("{err:?}");
disconnected.insert(machine_id.clone());
}
}
if !disconnected.is_empty() {
tracing::info!("Disconnecting daemons that failed watchdog: {disconnected:?}");
for machine_id in disconnected {
daemon_connections.remove(&machine_id);
}
}
}
Event::CtrlC => {
tracing::info!("Destroying coordinator after receiving Ctrl-C signal");
handle_destroy(
&running_dataflows,
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
)
.await?;
}
}
}
tracing::info!("stopped");
Ok(())
}
fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);
let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
if ctrlc_sent {
tracing::warn!("received second ctrlc signal -> aborting immediately");
std::process::abort();
} else {
tracing::info!("received ctrlc signal");
if ctrlc_tx.blocking_send(Event::CtrlC).is_err() {
tracing::error!("failed to report ctrl-c event to dora-coordinator");
}
ctrlc_sent = true;
}
})
.wrap_err("failed to set ctrl-c handler")?;
Ok(ReceiverStream::new(ctrlc_rx))
}
async fn handle_destroy(
running_dataflows: &HashMap<Uuid, RunningDataflow>,
daemon_connections: &mut HashMap<String, TcpStream>,
abortable_events: &futures::stream::AbortHandle,
daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
) -> Result<(), eyre::ErrReport> {
abortable_events.abort();
for &uuid in running_dataflows.keys() {
stop_dataflow(running_dataflows, uuid, daemon_connections).await?;
}
destroy_daemons(daemon_connections).await?;
*daemon_events_tx = None;
Ok(())
}
async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> {
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Watchdog).unwrap();
tcp_send(connection, &message)
.await
.wrap_err("failed to send watchdog message to daemon")?;
let reply_raw = tcp_receive(connection)
.await
.wrap_err("failed to receive stop reply from daemon")?;
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize stop reply from daemon")?
{
DaemonCoordinatorReply::WatchdogAck => Ok(()),
other => bail!("unexpected reply after sending `watchdog`: {other:?}"),
}
}
#[allow(dead_code)] struct RunningDataflow {
name: Option<String>,
uuid: Uuid,
communication_config: CommunicationConfig,
machines: BTreeSet<String>,
}
impl PartialEq for RunningDataflow {
fn eq(&self, other: &Self) -> bool {
self.name == other.name && self.uuid == other.uuid && self.machines == other.machines
}
}
impl Eq for RunningDataflow {}
async fn stop_dataflow(
running_dataflows: &HashMap<Uuid, RunningDataflow>,
uuid: Uuid,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<()> {
let Some(dataflow) = running_dataflows.get(&uuid) else {
bail!("No running dataflow found with UUID `{uuid}`")
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid })?;
for machine_id in &dataflow.machines {
let daemon_connection = daemon_connections
.get_mut(machine_id)
.wrap_err("no daemon connection")?; tcp_send(daemon_connection, &message)
.await
.wrap_err("failed to send stop message to daemon")?;
let reply_raw = tcp_receive(daemon_connection)
.await
.wrap_err("failed to receive stop reply from daemon")?;
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize stop reply from daemon")?
{
DaemonCoordinatorReply::StopResult(result) => result
.map_err(|e| eyre!(e))
.wrap_err("failed to stop dataflow")?,
other => bail!("unexpected reply after sending stop: {other:?}"),
}
}
tracing::info!("successfully stopped dataflow `{uuid}`");
Ok(())
}
async fn start_dataflow(
path: &Path,
name: Option<String>,
daemon_connections: &mut HashMap<String, TcpStream>,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow {
uuid,
communication_config,
machines,
} = spawn_dataflow(path, daemon_connections).await?;
Ok(RunningDataflow {
uuid,
name,
communication_config,
machines,
})
}
async fn destroy_daemons(daemon_connections: &mut HashMap<String, TcpStream>) -> eyre::Result<()> {
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Destroy)?;
for (machine_id, mut daemon_connection) in daemon_connections.drain() {
tcp_send(&mut daemon_connection, &message)
.await
.wrap_err("failed to send destroy message to daemon")?;
let reply_raw = tcp_receive(&mut daemon_connection)
.await
.wrap_err("failed to receive destroy reply from daemon")?;
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize destroy reply from daemon")?
{
DaemonCoordinatorReply::DestroyResult(result) => result
.map_err(|e| eyre!(e))
.wrap_err("failed to destroy dataflow")?,
other => bail!("unexpected reply after sending `destroy`: {other:?}"),
}
tracing::info!("successfully destroyed daemon `{machine_id}`");
}
Ok(())
}
#[derive(Debug)]
pub enum Event {
NewDaemonConnection(TcpStream),
DaemonConnectError(eyre::Report),
Dataflow { uuid: Uuid, event: DataflowEvent },
Control(ControlEvent),
Daemon(DaemonEvent),
DaemonWatchdogInterval,
CtrlC,
}
impl Event {
#[allow(clippy::match_like_matches_macro)]
pub fn log(&self) -> bool {
match self {
Event::DaemonWatchdogInterval => false,
_ => true,
}
}
}
#[derive(Debug)]
pub enum DataflowEvent {
DataflowFinishedOnMachine {
machine_id: String,
result: eyre::Result<()>,
},
}
#[derive(Debug)]
pub enum DaemonEvent {
Register {
machine_id: String,
connection: TcpStream,
},
}