dora_coordinator/
control.rs1use crate::{
2 Event,
3 tcp_utils::{tcp_receive, tcp_send},
4};
5use dora_message::{
6 BuildId, cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply,
7};
8use eyre::{Context, eyre};
9use futures::{
10 FutureExt, Stream, StreamExt,
11 future::{self, Either},
12 stream::FuturesUnordered,
13};
14use futures_concurrency::future::Race;
15use std::{io::ErrorKind, net::SocketAddr};
16use tokio::{
17 net::{TcpListener, TcpStream},
18 sync::{mpsc, oneshot},
19 task::JoinHandle,
20};
21use tokio_stream::wrappers::ReceiverStream;
22use uuid::Uuid;
23
24pub(crate) async fn control_events(
25 control_listen_addr: SocketAddr,
26 tasks: &FuturesUnordered<JoinHandle<()>>,
27) -> eyre::Result<impl Stream<Item = Event> + use<>> {
28 let (tx, rx) = mpsc::channel(10);
29
30 let (finish_tx, mut finish_rx) = mpsc::channel(1);
31 tasks.push(tokio::spawn(listen(control_listen_addr, tx, finish_tx)));
32 tasks.push(tokio::spawn(async move {
33 while let Some(()) = finish_rx.recv().await {}
34 }));
35
36 Ok(ReceiverStream::new(rx).map(Event::Control))
37}
38
39async fn listen(
40 control_listen_addr: SocketAddr,
41 tx: mpsc::Sender<ControlEvent>,
42 _finish_tx: mpsc::Sender<()>,
43) {
44 let result = TcpListener::bind(control_listen_addr)
45 .await
46 .wrap_err("failed to listen for control messages");
47 let incoming = match result {
48 Ok(incoming) => incoming,
49 Err(err) => {
50 let _ = tx.send(err.into()).await;
51 return;
52 }
53 };
54
55 loop {
56 let new_connection = incoming.accept().map(Either::Left);
57 let coordinator_stop = tx.closed().map(Either::Right);
58 let connection = match (new_connection, coordinator_stop).race().await {
59 future::Either::Left(connection) => connection,
60 future::Either::Right(()) => {
61 break;
63 }
64 };
65 match connection.wrap_err("failed to connect") {
66 Ok((connection, _)) => {
67 let tx = tx.clone();
68 tokio::spawn(handle_requests(connection, tx, _finish_tx.clone()));
69 }
70 Err(err) => {
71 if tx.blocking_send(err.into()).is_err() {
72 break;
73 }
74 }
75 }
76 }
77}
78
79async fn handle_requests(
80 mut connection: TcpStream,
81 tx: mpsc::Sender<ControlEvent>,
82 _finish_tx: mpsc::Sender<()>,
83) {
84 let peer_addr = connection.peer_addr().ok();
85 loop {
86 let next_request = tcp_receive(&mut connection).map(Either::Left);
87 let coordinator_stopped = tx.closed().map(Either::Right);
88 let raw = match (next_request, coordinator_stopped).race().await {
89 Either::Right(()) => break,
90 Either::Left(request) => match request {
91 Ok(message) => message,
92 Err(err) => match err.kind() {
93 ErrorKind::UnexpectedEof => {
94 tracing::trace!("Control connection closed");
95 break;
96 }
97 err => {
98 let err = eyre!(err).wrap_err("failed to receive incoming message");
99 tracing::error!("{err}");
100 break;
101 }
102 },
103 },
104 };
105
106 let request =
107 serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message");
108
109 if let Ok(ControlRequest::LogSubscribe { dataflow_id, level }) = request {
110 let _ = tx
111 .send(ControlEvent::LogSubscribe {
112 dataflow_id,
113 level,
114 connection,
115 })
116 .await;
117 break;
118 }
119
120 if let Ok(ControlRequest::BuildLogSubscribe { build_id, level }) = request {
121 let _ = tx
122 .send(ControlEvent::BuildLogSubscribe {
123 build_id,
124 level,
125 connection,
126 })
127 .await;
128 break;
129 }
130
131 let mut result = match request {
132 Ok(request) => handle_request(request, &tx).await,
133 Err(err) => Err(err),
134 };
135
136 if let Ok(ControlRequestReply::CliAndDefaultDaemonIps { cli, .. }) = &mut result {
137 if cli.is_none() {
138 *cli = peer_addr.map(|s| s.ip());
140 }
141 }
142
143 let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err:?}")));
144 let serialized: Vec<u8> =
145 match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") {
146 Ok(s) => s,
147 Err(err) => {
148 tracing::error!("{err:?}");
149 break;
150 }
151 };
152 match tcp_send(&mut connection, &serialized).await {
153 Ok(()) => {}
154 Err(err) => match err.kind() {
155 ErrorKind::UnexpectedEof => {
156 tracing::debug!("Control connection closed while trying to send reply");
157 break;
158 }
159 err => {
160 let err = eyre!(err).wrap_err("failed to send reply");
161 tracing::error!("{err}");
162 break;
163 }
164 },
165 }
166
167 if matches!(reply, ControlRequestReply::CoordinatorStopped) {
168 break;
169 }
170 }
171}
172
173async fn handle_request(
174 request: ControlRequest,
175 tx: &mpsc::Sender<ControlEvent>,
176) -> eyre::Result<ControlRequestReply> {
177 let (reply_tx, reply_rx) = oneshot::channel();
178 let event = ControlEvent::IncomingRequest {
179 request: request.clone(),
180 reply_sender: reply_tx,
181 };
182
183 if tx.send(event).await.is_err() {
184 return Ok(ControlRequestReply::CoordinatorStopped);
185 }
186
187 reply_rx
188 .await
189 .wrap_err_with(|| format!("no coordinator reply to {request:?}"))?
190}
191
192#[derive(Debug)]
193pub enum ControlEvent {
194 IncomingRequest {
195 request: ControlRequest,
196 reply_sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
197 },
198 LogSubscribe {
199 dataflow_id: Uuid,
200 level: log::LevelFilter,
201 connection: TcpStream,
202 },
203 BuildLogSubscribe {
204 build_id: BuildId,
205 level: log::LevelFilter,
206 connection: TcpStream,
207 },
208 Error(eyre::Report),
209}
210
211impl From<eyre::Report> for ControlEvent {
212 fn from(err: eyre::Report) -> Self {
213 ControlEvent::Error(err)
214 }
215}