use crate::{DaemonNodeEvent, Event};
use dora_core::{
config::{DataId, LocalCommunicationConfig, NodeId},
topics::LOCALHOST,
uhlc,
};
use dora_message::{
DataflowId,
common::{DropToken, Timestamped},
daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent},
node_to_daemon::DaemonRequest,
};
use eyre::{Context, eyre};
use futures::{Future, future, task};
use std::{
collections::{BTreeMap, VecDeque},
mem,
sync::Arc,
task::Poll,
};
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::{
net::TcpListener,
sync::{
mpsc::{self, UnboundedReceiver},
oneshot,
},
};
pub mod tcp;
#[cfg(unix)]
pub mod unix_domain;
pub async fn spawn_listener_loop(
dataflow_id: &DataflowId,
node_id: &NodeId,
daemon_tx: &mpsc::Sender<Timestamped<Event>>,
config: LocalCommunicationConfig,
queue_sizes: BTreeMap<DataId, usize>,
clock: Arc<uhlc::HLC>,
) -> eyre::Result<(DaemonCommunication, Option<tokio::task::AbortHandle>)> {
match config {
LocalCommunicationConfig::Tcp => {
let socket = match TcpListener::bind((LOCALHOST, 0)).await {
Ok(socket) => socket,
Err(err) => {
return Err(
eyre::Report::new(err).wrap_err("failed to create local TCP listener")
);
}
};
let socket_addr = socket
.local_addr()
.wrap_err("failed to get local addr of socket")?;
let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let handle = tokio::spawn(async move {
tcp::listener_loop(socket, daemon_tx, queue_sizes, clock).await;
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});
let abort_handle = handle.abort_handle();
Ok((DaemonCommunication::Tcp { socket_addr }, Some(abort_handle)))
}
#[cfg(unix)]
LocalCommunicationConfig::UnixDomain => {
use std::path::Path;
let tmpfile_dir = Path::new("/tmp");
let tmpfile_dir = tmpfile_dir.join(dataflow_id.to_string());
if !tmpfile_dir.exists() {
std::fs::create_dir_all(&tmpfile_dir).context("could not create tmp dir")?;
}
let socket_file = tmpfile_dir.join(format!("{node_id}.sock"));
let socket = match UnixListener::bind(&socket_file) {
Ok(socket) => socket,
Err(err) => {
return Err(eyre::Report::new(err)
.wrap_err("failed to create local Unix domain socket"));
}
};
let event_loop_node_id = format!("{dataflow_id}/{node_id}");
let daemon_tx = daemon_tx.clone();
let handle = tokio::spawn(async move {
unix_domain::listener_loop(socket, daemon_tx, queue_sizes, clock).await;
tracing::debug!("event listener loop finished for `{event_loop_node_id}`");
});
let abort_handle = handle.abort_handle();
Ok((
DaemonCommunication::UnixDomain { socket_file },
Some(abort_handle),
))
}
#[cfg(not(unix))]
LocalCommunicationConfig::UnixDomain => {
eyre::bail!("Communication via UNIX domain sockets is only supported on UNIX systems")
}
}
}
struct Listener {
dataflow_id: DataflowId,
node_id: NodeId,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
subscribed_events: Option<UnboundedReceiver<Timestamped<NodeEvent>>>,
subscribed_drop_events: Option<UnboundedReceiver<Timestamped<NodeDropEvent>>>,
queue: VecDeque<Box<Option<Timestamped<NodeEvent>>>>,
clock: Arc<uhlc::HLC>,
}
impl Listener {
pub(crate) async fn run<C: Connection>(
mut connection: C,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
hlc: Arc<uhlc::HLC>,
) {
let message = match connection
.receive_message()
.await
.wrap_err("failed to receive register message")
{
Ok(Some(m)) => m,
Ok(None) => {
tracing::info!("channel disconnected before register message");
return;
} Err(err) => {
tracing::info!("{err:?}");
return;
}
};
if let Err(err) = hlc.update_with_timestamp(&message.timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
match message.inner {
DaemonRequest::Register(register_request) => {
let result = register_request.check_version();
let send_result = connection
.send_reply(DaemonReply::Result(result.clone()))
.await
.wrap_err("failed to send register reply");
let dataflow_id = register_request.dataflow_id;
let node_id = register_request.node_id;
match (result, send_result) {
(Ok(()), Ok(())) => {
let mut listener = Listener {
dataflow_id,
node_id,
daemon_tx,
subscribed_events: None,
subscribed_drop_events: None,
queue: VecDeque::new(),
clock: hlc.clone(),
};
match listener
.run_inner(connection)
.await
.wrap_err("listener failed")
{
Ok(()) => {}
Err(err) => tracing::error!("{err:?}"),
}
}
(Err(err), _) => {
tracing::warn!("failed to register node {dataflow_id}/{node_id}: {err}");
}
(Ok(()), Err(err)) => {
tracing::warn!(
"failed send register reply to node {dataflow_id}/{node_id}: {err:?}"
);
}
}
}
other => {
tracing::warn!("expected register message, got `{other:?}`");
let reply = DaemonReply::Result(Err("must send register message first".into()));
if let Err(err) = connection
.send_reply(reply)
.await
.wrap_err("failed to send reply")
{
tracing::warn!("{err:?}");
}
}
}
}
async fn run_inner<C: Connection>(&mut self, mut connection: C) -> eyre::Result<()> {
loop {
let mut next_message = connection.receive_message();
let message = loop {
let next_event = self.next_event();
let event = match future::select(next_event, next_message).await {
future::Either::Left((event, n)) => {
next_message = n;
event
}
future::Either::Right((message, _)) => break message,
};
self.queue.push_back(Box::new(Some(event)));
self.handle_events().await?;
};
match message.wrap_err("failed to receive DaemonRequest") {
Ok(Some(message)) => {
if let Err(err) = self.handle_message(message, &mut connection).await {
tracing::warn!("{err:?}");
}
}
Err(err) => {
tracing::warn!("{err:?}");
}
Ok(None) => {
break; }
}
}
Ok(())
}
async fn handle_events(&mut self) -> eyre::Result<()> {
if let Some(events) = &mut self.subscribed_events {
while let Ok(event) = events.try_recv() {
self.queue.push_back(Box::new(Some(event)));
}
}
Ok(())
}
#[tracing::instrument(skip(self, connection), fields(%self.dataflow_id, %self.node_id), level = "trace")]
async fn handle_message<C: Connection>(
&mut self,
message: Timestamped<DaemonRequest>,
connection: &mut C,
) -> eyre::Result<()> {
let timestamp = message.timestamp;
if let Err(err) = self.clock.update_with_timestamp(×tamp) {
tracing::warn!("failed to update HLC: {err}");
}
match message.inner {
DaemonRequest::Register { .. } => {
let reply = DaemonReply::Result(Err("unexpected register message".into()));
self.send_reply(reply, connection)
.await
.wrap_err("failed to send register reply")?;
}
DaemonRequest::NodeConfig { .. } => {
let reply = DaemonReply::Result(Err("unexpected node config message".into()));
self.send_reply(reply, connection)
.await
.wrap_err("failed to send register reply")?;
}
DaemonRequest::OutputsDone => {
let (reply_sender, reply) = oneshot::channel();
self.process_daemon_event(
DaemonNodeEvent::OutputsDone { reply_sender },
Some(reply),
connection,
)
.await?
}
DaemonRequest::CloseOutputs(outputs) => {
let (reply_sender, reply) = oneshot::channel();
self.process_daemon_event(
DaemonNodeEvent::CloseOutputs {
outputs,
reply_sender,
},
Some(reply),
connection,
)
.await?
}
DaemonRequest::SendMessage {
output_id,
metadata,
data,
} => {
let event = crate::DaemonNodeEvent::SendOut {
output_id,
metadata,
data,
};
self.process_daemon_event(event, None, connection).await?;
}
DaemonRequest::Subscribe => {
let (tx, rx) = mpsc::unbounded_channel();
let (reply_sender, reply) = oneshot::channel();
self.process_daemon_event(
DaemonNodeEvent::Subscribe {
event_sender: tx,
reply_sender,
},
Some(reply),
connection,
)
.await?;
self.subscribed_events = Some(rx);
}
DaemonRequest::SubscribeDrop => {
let (tx, rx) = mpsc::unbounded_channel();
let (reply_sender, reply) = oneshot::channel();
self.process_daemon_event(
DaemonNodeEvent::SubscribeDrop {
event_sender: tx,
reply_sender,
},
Some(reply),
connection,
)
.await?;
self.subscribed_drop_events = Some(rx);
}
DaemonRequest::NextEvent { drop_tokens } => {
self.report_drop_tokens(drop_tokens).await?;
let queued_events: Vec<_> = mem::take(&mut self.queue)
.into_iter()
.filter_map(|e| *e)
.collect();
let reply = if queued_events.is_empty() {
match self.subscribed_events.as_mut() {
Some(events) => match events.recv().await {
Some(event) => DaemonReply::NextEvents(vec![event]),
None => DaemonReply::NextEvents(vec![]),
},
None => {
DaemonReply::Result(Err("Ignoring event request because no subscribe \
message was sent yet"
.into()))
}
}
} else {
DaemonReply::NextEvents(queued_events)
};
self.send_reply(reply.clone(), connection)
.await
.wrap_err_with(|| format!("failed to send NextEvent reply: {reply:?}"))?;
}
DaemonRequest::ReportDropTokens { drop_tokens } => {
self.report_drop_tokens(drop_tokens).await?;
self.send_reply(DaemonReply::Empty, connection)
.await
.wrap_err("failed to send ReportDropTokens reply")?;
}
DaemonRequest::NextFinishedDropTokens => {
let reply = match self.subscribed_drop_events.as_mut() {
Some(events) => match events.recv().await {
Some(event) => DaemonReply::NextDropEvents(vec![event]),
None => DaemonReply::NextDropEvents(vec![]),
},
None => DaemonReply::Result(Err("Ignoring event request because no drop \
subscribe message was sent yet"
.into())),
};
self.send_reply(reply.clone(), connection)
.await
.wrap_err_with(|| {
format!("failed to send NextFinishedDropTokens reply: {reply:?}")
})?;
}
DaemonRequest::EventStreamDropped => {
let (reply_sender, reply) = oneshot::channel();
self.process_daemon_event(
DaemonNodeEvent::EventStreamDropped { reply_sender },
Some(reply),
connection,
)
.await?;
}
}
Ok(())
}
async fn report_drop_tokens(&mut self, drop_tokens: Vec<DropToken>) -> eyre::Result<()> {
if !drop_tokens.is_empty() {
let event = Event::Node {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
event: DaemonNodeEvent::ReportDrop {
tokens: drop_tokens,
},
};
let event = Timestamped {
inner: event,
timestamp: self.clock.new_timestamp(),
};
self.daemon_tx
.send(event)
.await
.map_err(|_| eyre!("failed to report drop tokens to daemon"))?;
}
Ok(())
}
async fn process_daemon_event<C: Connection>(
&mut self,
event: DaemonNodeEvent,
reply: Option<oneshot::Receiver<DaemonReply>>,
connection: &mut C,
) -> eyre::Result<()> {
let event = Event::Node {
dataflow_id: self.dataflow_id,
node_id: self.node_id.clone(),
event,
};
let event = Timestamped {
inner: event,
timestamp: self.clock.new_timestamp(),
};
self.daemon_tx
.send(event)
.await
.map_err(|_| eyre!("failed to send event to daemon"))?;
let reply = if let Some(reply) = reply {
reply
.await
.map_err(|_| eyre!("failed to receive reply from daemon"))?
} else {
DaemonReply::Empty
};
self.send_reply(reply, connection).await?;
Ok(())
}
async fn send_reply<C: Connection>(
&mut self,
reply: DaemonReply,
connection: &mut C,
) -> eyre::Result<()> {
connection
.send_reply(reply)
.await
.wrap_err_with(|| format!("failed to send reply to node `{}`", self.node_id))
}
fn next_event(&mut self) -> impl Future<Output = Timestamped<NodeEvent>> + Unpin + '_ {
let poll = |cx: &mut task::Context<'_>| {
if let Some(events) = &mut self.subscribed_events {
match events.poll_recv(cx) {
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
} else {
Poll::Pending
}
};
future::poll_fn(poll)
}
}
#[async_trait::async_trait]
trait Connection {
async fn receive_message(&mut self) -> eyre::Result<Option<Timestamped<DaemonRequest>>>;
async fn send_reply(&mut self, message: DaemonReply) -> eyre::Result<()>;
}