use std::{
collections::{BTreeSet, HashMap, HashSet},
sync::Arc,
};
use dora_core::{config::NodeId, uhlc::HLC};
use dora_message::{
DataflowId,
common::{DaemonId, Timestamped},
daemon_to_coordinator::{CoordinatorNotifyClient, LogMessage},
daemon_to_node::DaemonReply,
tarpc,
};
use eyre::bail;
use tokio::sync::{mpsc, oneshot};
use crate::{CascadingErrorCauses, Event, log::DataflowLogger};
pub struct PendingNodes {
dataflow_id: DataflowId,
#[allow(dead_code)]
daemon_id: DaemonId,
local_nodes: HashSet<NodeId>,
external_nodes: bool,
waiting_subscribers: HashMap<NodeId, oneshot::Sender<DaemonReply>>,
exited_before_subscribe: Vec<NodeId>,
reported_init_to_coordinator: bool,
events_tx: mpsc::Sender<Timestamped<Event>>,
clock: Arc<HLC>,
}
impl PendingNodes {
pub fn new(
dataflow_id: DataflowId,
daemon_id: DaemonId,
events_tx: mpsc::Sender<Timestamped<Event>>,
clock: Arc<HLC>,
) -> Self {
Self {
dataflow_id,
daemon_id,
local_nodes: HashSet::new(),
external_nodes: false,
waiting_subscribers: HashMap::new(),
exited_before_subscribe: Default::default(),
reported_init_to_coordinator: false,
events_tx,
clock,
}
}
pub fn insert(&mut self, node_id: NodeId) {
self.local_nodes.insert(node_id);
}
pub fn set_external_nodes(&mut self, value: bool) {
self.external_nodes = value;
}
pub fn local_nodes_pending(&self) -> bool {
!self.local_nodes.is_empty()
}
pub async fn handle_node_subscription(
&mut self,
node_id: NodeId,
reply_sender: oneshot::Sender<DaemonReply>,
coordinator_client: &Option<CoordinatorNotifyClient>,
clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
logger: &mut DataflowLogger<'_>,
) -> eyre::Result<DataflowStatus> {
self.waiting_subscribers
.insert(node_id.clone(), reply_sender);
self.local_nodes.remove(&node_id);
self.update_dataflow_status(coordinator_client, clock, cascading_errors, logger)
.await
}
pub async fn handle_node_stop(
&mut self,
node_id: &NodeId,
coordinator_client: &Option<CoordinatorNotifyClient>,
clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
logger: &mut DataflowLogger<'_>,
) -> eyre::Result<()> {
if self.local_nodes.remove(node_id) {
logger
.log(
dora_message::daemon_to_coordinator::LogLevel::Warn,
Some(node_id.clone()),
Some("daemon::pending".into()),
"node exited before initializing dora connection",
)
.await;
self.exited_before_subscribe.push(node_id.clone());
self.update_dataflow_status(coordinator_client, clock, cascading_errors, logger)
.await?;
}
Ok(())
}
pub fn handle_node_stop_sync(
&mut self,
node_id: &NodeId,
cascading_errors: &mut CascadingErrorCauses,
) -> bool {
if self.local_nodes.remove(node_id) {
self.exited_before_subscribe.push(node_id.clone());
if self.local_nodes.is_empty() && !self.external_nodes {
self.answer_subscribe_requests(Vec::new(), cascading_errors);
}
true
} else {
false
}
}
pub fn check_and_answer_subscribers(
&mut self,
cascading_errors: &mut CascadingErrorCauses,
) -> bool {
if !self.local_nodes.is_empty() {
return false;
}
if self.external_nodes {
if !self.reported_init_to_coordinator {
self.reported_init_to_coordinator = true;
true
} else {
false
}
} else {
self.answer_subscribe_requests(Vec::new(), cascading_errors);
false
}
}
pub fn exited_before_subscribe(&self) -> &[NodeId] {
&self.exited_before_subscribe
}
pub async fn handle_dataflow_stop(
&mut self,
coordinator_client: &Option<CoordinatorNotifyClient>,
clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
dynamic_nodes: &BTreeSet<NodeId>,
logger: &mut DataflowLogger<'_>,
) -> eyre::Result<Vec<LogMessage>> {
for node_id in dynamic_nodes {
if self.local_nodes.remove(node_id) {
self.update_dataflow_status(coordinator_client, clock, cascading_errors, logger)
.await?;
}
}
Ok(Vec::new())
}
pub async fn handle_external_all_nodes_ready(
&mut self,
exited_before_subscribe: Vec<NodeId>,
cascading_errors: &mut CascadingErrorCauses,
) -> eyre::Result<()> {
if !self.local_nodes.is_empty() {
bail!("received external `all_nodes_ready` event before local nodes were ready");
}
self.answer_subscribe_requests(exited_before_subscribe, cascading_errors);
Ok(())
}
async fn update_dataflow_status(
&mut self,
coordinator_client: &Option<CoordinatorNotifyClient>,
clock: &HLC,
cascading_errors: &mut CascadingErrorCauses,
logger: &mut DataflowLogger<'_>,
) -> eyre::Result<DataflowStatus> {
if self.local_nodes.is_empty() {
if self.external_nodes {
if !self.reported_init_to_coordinator {
self.report_nodes_ready(coordinator_client, clock, logger)
.await?;
self.reported_init_to_coordinator = true;
}
Ok(DataflowStatus::Pending)
} else {
self.answer_subscribe_requests(Vec::new(), cascading_errors);
Ok(DataflowStatus::AllNodesReady)
}
} else {
Ok(DataflowStatus::Pending)
}
}
fn answer_subscribe_requests(
&mut self,
exited_before_subscribe_external: Vec<NodeId>,
cascading_errors: &mut CascadingErrorCauses,
) {
let node_exited_before_subscribe = match self.exited_before_subscribe.as_slice() {
[first, ..] => Some(first),
[] => match exited_before_subscribe_external.as_slice() {
[first, ..] => Some(first),
[] => None,
},
};
let result = match &node_exited_before_subscribe {
Some(causing_node) => Err(format!(
"Node {causing_node} exited before initializing dora. For \
more information, run `dora logs {} {causing_node}`.",
self.dataflow_id
)),
None => Ok(()),
};
let subscribe_replies = std::mem::take(&mut self.waiting_subscribers);
for (node_id, reply_sender) in subscribe_replies.into_iter() {
if let Some(causing_node) = node_exited_before_subscribe {
cascading_errors.report_cascading_error(causing_node.clone(), node_id.clone());
}
let _ = reply_sender.send(DaemonReply::Result(result.clone()));
}
}
async fn report_nodes_ready(
&self,
coordinator_client: &Option<CoordinatorNotifyClient>,
_clock: &HLC,
logger: &mut DataflowLogger<'_>,
) -> eyre::Result<()> {
let Some(client) = coordinator_client else {
bail!("no coordinator client to send AllNodesReady");
};
logger
.log(
dora_message::daemon_to_coordinator::LogLevel::Info,
None,
Some("daemon".into()),
format!(
"all local nodes are ready (exit before subscribe: {:?}), waiting for remote nodes",
self.exited_before_subscribe
),
)
.await;
let client = client.clone();
let dataflow_id = self.dataflow_id;
let exited = self.exited_before_subscribe.clone();
let events_tx = self.events_tx.clone();
let clock = self.clock.clone();
tokio::spawn(async move {
if let Err(err) = client
.all_nodes_ready(tarpc::context::current(), dataflow_id, exited)
.await
{
let _ = events_tx
.send(Timestamped {
inner: Event::DaemonError(
eyre::eyre!(err).wrap_err("failed to send AllNodesReady RPC"),
),
timestamp: clock.new_timestamp(),
})
.await;
}
});
Ok(())
}
}
pub enum DataflowStatus {
AllNodesReady,
Pending,
}