use crate::{
Event,
tcp_utils::{tcp_receive, tcp_send},
};
use dora_message::{BuildId, cli_to_coordinator::LegacyControlRequest};
use eyre::{Context, eyre};
use futures::{
FutureExt, Stream, StreamExt,
future::{self, Either},
stream::FuturesUnordered,
};
use futures_concurrency::future::Race;
use std::{io::ErrorKind, net::SocketAddr};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
task::JoinHandle,
};
use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;
pub(crate) async fn control_events(
control_listen_addr: SocketAddr,
tasks: &FuturesUnordered<JoinHandle<()>>,
) -> eyre::Result<impl Stream<Item = Event> + use<>> {
let (tx, rx) = mpsc::channel(10);
let (finish_tx, mut finish_rx) = mpsc::channel(1);
tasks.push(tokio::spawn(listen(control_listen_addr, tx, finish_tx)));
tasks.push(tokio::spawn(async move {
while let Some(()) = finish_rx.recv().await {}
}));
Ok(ReceiverStream::new(rx).map(Event::Control))
}
async fn listen(
control_listen_addr: SocketAddr,
tx: mpsc::Sender<ControlEvent>,
_finish_tx: mpsc::Sender<()>,
) {
let result = TcpListener::bind(control_listen_addr)
.await
.wrap_err("failed to listen for control messages");
let incoming = match result {
Ok(incoming) => incoming,
Err(err) => {
let _ = tx.send(err.into()).await;
return;
}
};
loop {
let new_connection = incoming.accept().map(Either::Left);
let coordinator_stop = tx.closed().map(Either::Right);
let connection = match (new_connection, coordinator_stop).race().await {
future::Either::Left(connection) => connection,
future::Either::Right(()) => {
break;
}
};
match connection.wrap_err("failed to connect") {
Ok((connection, _)) => {
let tx = tx.clone();
tokio::spawn(handle_requests(connection, tx, _finish_tx.clone()));
}
Err(err) => {
if tx.blocking_send(err.into()).is_err() {
break;
}
}
}
}
}
async fn handle_requests(
mut connection: TcpStream,
tx: mpsc::Sender<ControlEvent>,
_finish_tx: mpsc::Sender<()>,
) {
let _peer_addr = connection.peer_addr().ok();
let next_request = tcp_receive(&mut connection).map(Either::Left);
let coordinator_stopped = tx.closed().map(Either::Right);
let raw = match (next_request, coordinator_stopped).race().await {
Either::Right(()) => return,
Either::Left(request) => match request {
Ok(message) => message,
Err(err) => match err.kind() {
ErrorKind::UnexpectedEof => {
tracing::trace!("Control connection closed");
return;
}
err => {
let err = eyre!(err).wrap_err("failed to receive incoming message");
tracing::error!("{err}");
return;
}
},
},
};
let request = serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message");
match request {
Ok(request) => match request {
LegacyControlRequest::LogSubscribe { dataflow_id, level } => {
let _ = tx
.send(ControlEvent::LogSubscribe {
dataflow_id,
level,
connection,
})
.await;
}
LegacyControlRequest::BuildLogSubscribe { build_id, level } => {
let _ = tx
.send(ControlEvent::BuildLogSubscribe {
build_id,
level,
connection,
})
.await;
}
},
Err(err) => {
tracing::warn!("failed to parse incoming control message: {:#?}", err);
let reply = format!("failed to parse message: {err:?}");
let serialized: Vec<u8> =
match serde_json::to_vec(&reply).wrap_err("failed to serialize message") {
Ok(s) => s,
Err(err) => {
tracing::error!("{err:?}");
return;
}
};
match tcp_send(&mut connection, &serialized).await {
Ok(()) => {}
Err(err) => match err.kind() {
ErrorKind::UnexpectedEof => {
tracing::debug!("Control connection closed while trying to send reply");
}
err => {
let err = eyre!(err).wrap_err("failed to send reply");
tracing::error!("{err}");
}
},
};
}
}
}
#[derive(Debug)]
pub enum ControlEvent {
LogSubscribe {
dataflow_id: Uuid,
level: log::LevelFilter,
connection: TcpStream,
},
BuildLogSubscribe {
build_id: BuildId,
level: log::LevelFilter,
connection: TcpStream,
},
Error(eyre::Report),
}
impl From<eyre::Report> for ControlEvent {
fn from(err: eyre::Report) -> Self {
ControlEvent::Error(err)
}
}