dora_coordinator/
control.rs1use crate::{
2 Event,
3 tcp_utils::{tcp_receive, tcp_send},
4};
5use dora_message::{BuildId, cli_to_coordinator::LegacyControlRequest};
6use eyre::{Context, eyre};
7use futures::{
8 FutureExt, Stream, StreamExt,
9 future::{self, Either},
10 stream::FuturesUnordered,
11};
12use futures_concurrency::future::Race;
13use std::{io::ErrorKind, net::SocketAddr};
14use tokio::{
15 net::{TcpListener, TcpStream},
16 sync::mpsc,
17 task::JoinHandle,
18};
19use tokio_stream::wrappers::ReceiverStream;
20use uuid::Uuid;
21
22pub(crate) async fn control_events(
23 control_listen_addr: SocketAddr,
24 tasks: &FuturesUnordered<JoinHandle<()>>,
25) -> eyre::Result<impl Stream<Item = Event> + use<>> {
26 let (tx, rx) = mpsc::channel(10);
27
28 let (finish_tx, mut finish_rx) = mpsc::channel(1);
29 tasks.push(tokio::spawn(listen(control_listen_addr, tx, finish_tx)));
30 tasks.push(tokio::spawn(async move {
31 while let Some(()) = finish_rx.recv().await {}
32 }));
33
34 Ok(ReceiverStream::new(rx).map(Event::Control))
35}
36
37async fn listen(
38 control_listen_addr: SocketAddr,
39 tx: mpsc::Sender<ControlEvent>,
40 _finish_tx: mpsc::Sender<()>,
41) {
42 let result = TcpListener::bind(control_listen_addr)
43 .await
44 .wrap_err("failed to listen for control messages");
45 let incoming = match result {
46 Ok(incoming) => incoming,
47 Err(err) => {
48 let _ = tx.send(err.into()).await;
49 return;
50 }
51 };
52
53 loop {
54 let new_connection = incoming.accept().map(Either::Left);
55 let coordinator_stop = tx.closed().map(Either::Right);
56 let connection = match (new_connection, coordinator_stop).race().await {
57 future::Either::Left(connection) => connection,
58 future::Either::Right(()) => {
59 break;
61 }
62 };
63 match connection.wrap_err("failed to connect") {
64 Ok((connection, _)) => {
65 let tx = tx.clone();
66 tokio::spawn(handle_requests(connection, tx, _finish_tx.clone()));
67 }
68 Err(err) => {
69 if tx.blocking_send(err.into()).is_err() {
70 break;
71 }
72 }
73 }
74 }
75}
76
77async fn handle_requests(
78 mut connection: TcpStream,
79 tx: mpsc::Sender<ControlEvent>,
80 _finish_tx: mpsc::Sender<()>,
81) {
82 let _peer_addr = connection.peer_addr().ok();
83
84 let next_request = tcp_receive(&mut connection).map(Either::Left);
85 let coordinator_stopped = tx.closed().map(Either::Right);
86 let raw = match (next_request, coordinator_stopped).race().await {
87 Either::Right(()) => return,
88 Either::Left(request) => match request {
89 Ok(message) => message,
90 Err(err) => match err.kind() {
91 ErrorKind::UnexpectedEof => {
92 tracing::trace!("Control connection closed");
93 return;
94 }
95 err => {
96 let err = eyre!(err).wrap_err("failed to receive incoming message");
97 tracing::error!("{err}");
98 return;
99 }
100 },
101 },
102 };
103
104 let request = serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message");
105
106 match request {
107 Ok(request) => match request {
108 LegacyControlRequest::LogSubscribe { dataflow_id, level } => {
109 let _ = tx
110 .send(ControlEvent::LogSubscribe {
111 dataflow_id,
112 level,
113 connection,
114 })
115 .await;
116 }
117 LegacyControlRequest::BuildLogSubscribe { build_id, level } => {
118 let _ = tx
119 .send(ControlEvent::BuildLogSubscribe {
120 build_id,
121 level,
122 connection,
123 })
124 .await;
125 }
126 },
127 Err(err) => {
128 tracing::warn!("failed to parse incoming control message: {:#?}", err);
129 let reply = format!("failed to parse message: {err:?}");
130 let serialized: Vec<u8> =
131 match serde_json::to_vec(&reply).wrap_err("failed to serialize message") {
132 Ok(s) => s,
133 Err(err) => {
134 tracing::error!("{err:?}");
135 return;
136 }
137 };
138 match tcp_send(&mut connection, &serialized).await {
139 Ok(()) => {}
140 Err(err) => match err.kind() {
141 ErrorKind::UnexpectedEof => {
142 tracing::debug!("Control connection closed while trying to send reply");
143 }
144 err => {
145 let err = eyre!(err).wrap_err("failed to send reply");
146 tracing::error!("{err}");
147 }
148 },
149 };
150 }
151 }
152}
153
154#[derive(Debug)]
155pub enum ControlEvent {
156 LogSubscribe {
157 dataflow_id: Uuid,
158 level: log::LevelFilter,
159 connection: TcpStream,
160 },
161 BuildLogSubscribe {
162 build_id: BuildId,
163 level: log::LevelFilter,
164 connection: TcpStream,
165 },
166 Error(eyre::Report),
167}
168
169impl From<eyre::Report> for ControlEvent {
170 fn from(err: eyre::Report) -> Self {
171 ControlEvent::Error(err)
172 }
173}