dora_coordinator/
control.rs1use crate::{
2 tcp_utils::{tcp_receive, tcp_send},
3 Event,
4};
5use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
6use eyre::{eyre, Context};
7use futures::{
8 future::{self, Either},
9 stream::FuturesUnordered,
10 FutureExt, Stream, StreamExt,
11};
12use futures_concurrency::future::Race;
13use std::{io::ErrorKind, net::SocketAddr};
14use tokio::{
15 net::{TcpListener, TcpStream},
16 sync::{mpsc, oneshot},
17 task::JoinHandle,
18};
19use tokio_stream::wrappers::ReceiverStream;
20use uuid::Uuid;
21
22pub(crate) async fn control_events(
23 control_listen_addr: SocketAddr,
24 tasks: &FuturesUnordered<JoinHandle<()>>,
25) -> eyre::Result<impl Stream<Item = Event>> {
26 let (tx, rx) = mpsc::channel(10);
27
28 let (finish_tx, mut finish_rx) = mpsc::channel(1);
29 tasks.push(tokio::spawn(listen(control_listen_addr, tx, finish_tx)));
30 tasks.push(tokio::spawn(async move {
31 while let Some(()) = finish_rx.recv().await {}
32 }));
33
34 Ok(ReceiverStream::new(rx).map(Event::Control))
35}
36
37async fn listen(
38 control_listen_addr: SocketAddr,
39 tx: mpsc::Sender<ControlEvent>,
40 _finish_tx: mpsc::Sender<()>,
41) {
42 let result = TcpListener::bind(control_listen_addr)
43 .await
44 .wrap_err("failed to listen for control messages");
45 let incoming = match result {
46 Ok(incoming) => incoming,
47 Err(err) => {
48 let _ = tx.send(err.into()).await;
49 return;
50 }
51 };
52
53 loop {
54 let new_connection = incoming.accept().map(Either::Left);
55 let coordinator_stop = tx.closed().map(Either::Right);
56 let connection = match (new_connection, coordinator_stop).race().await {
57 future::Either::Left(connection) => connection,
58 future::Either::Right(()) => {
59 break;
61 }
62 };
63 match connection.wrap_err("failed to connect") {
64 Ok((connection, _)) => {
65 let tx = tx.clone();
66 tokio::spawn(handle_requests(connection, tx, _finish_tx.clone()));
67 }
68 Err(err) => {
69 if tx.blocking_send(err.into()).is_err() {
70 break;
71 }
72 }
73 }
74 }
75}
76
77async fn handle_requests(
78 mut connection: TcpStream,
79 tx: mpsc::Sender<ControlEvent>,
80 _finish_tx: mpsc::Sender<()>,
81) {
82 loop {
83 let next_request = tcp_receive(&mut connection).map(Either::Left);
84 let coordinator_stopped = tx.closed().map(Either::Right);
85 let raw = match (next_request, coordinator_stopped).race().await {
86 Either::Right(()) => break,
87 Either::Left(request) => match request {
88 Ok(message) => message,
89 Err(err) => match err.kind() {
90 ErrorKind::UnexpectedEof => {
91 tracing::trace!("Control connection closed");
92 break;
93 }
94 err => {
95 let err = eyre!(err).wrap_err("failed to receive incoming message");
96 tracing::error!("{err}");
97 break;
98 }
99 },
100 },
101 };
102
103 let request =
104 serde_json::from_slice(&raw).wrap_err("failed to deserialize incoming message");
105
106 if let Ok(ControlRequest::LogSubscribe { dataflow_id, level }) = request {
107 let _ = tx
108 .send(ControlEvent::LogSubscribe {
109 dataflow_id,
110 level,
111 connection,
112 })
113 .await;
114 break;
115 }
116
117 let result = match request {
118 Ok(request) => handle_request(request, &tx).await,
119 Err(err) => Err(err),
120 };
121
122 let reply = result.unwrap_or_else(|err| ControlRequestReply::Error(format!("{err:?}")));
123 let serialized: Vec<u8> =
124 match serde_json::to_vec(&reply).wrap_err("failed to serialize ControlRequestReply") {
125 Ok(s) => s,
126 Err(err) => {
127 tracing::error!("{err:?}");
128 break;
129 }
130 };
131 match tcp_send(&mut connection, &serialized).await {
132 Ok(()) => {}
133 Err(err) => match err.kind() {
134 ErrorKind::UnexpectedEof => {
135 tracing::debug!("Control connection closed while trying to send reply");
136 break;
137 }
138 err => {
139 let err = eyre!(err).wrap_err("failed to send reply");
140 tracing::error!("{err}");
141 break;
142 }
143 },
144 }
145
146 if matches!(reply, ControlRequestReply::CoordinatorStopped) {
147 break;
148 }
149 }
150}
151
152async fn handle_request(
153 request: ControlRequest,
154 tx: &mpsc::Sender<ControlEvent>,
155) -> eyre::Result<ControlRequestReply> {
156 let (reply_tx, reply_rx) = oneshot::channel();
157 let event = ControlEvent::IncomingRequest {
158 request,
159 reply_sender: reply_tx,
160 };
161
162 if tx.send(event).await.is_err() {
163 return Ok(ControlRequestReply::CoordinatorStopped);
164 }
165
166 reply_rx
167 .await
168 .unwrap_or(Ok(ControlRequestReply::CoordinatorStopped))
169}
170
171#[derive(Debug)]
172pub enum ControlEvent {
173 IncomingRequest {
174 request: ControlRequest,
175 reply_sender: oneshot::Sender<eyre::Result<ControlRequestReply>>,
176 },
177 LogSubscribe {
178 dataflow_id: Uuid,
179 level: log::LevelFilter,
180 connection: TcpStream,
181 },
182 Error(eyre::Report),
183}
184
185impl From<eyre::Report> for ControlEvent {
186 fn from(err: eyre::Report) -> Self {
187 ControlEvent::Error(err)
188 }
189}